From 1b3c976650e94f9ae029dac6af803a51b12c6588 Mon Sep 17 00:00:00 2001 From: KonghaYao <3446798488@qq.com> Date: Sat, 7 Aug 2021 17:09:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20Combine=20?= =?UTF-8?q?=E5=92=8C=20Break=20plugin?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dist/JSpider.esm.min.js | 154 +++++++++++++++++------------------ dist/JSpider.min.js | 154 +++++++++++++++++------------------ package/plugins/Combine.js | 44 ++++++++++ package/plugins/Download.js | 2 +- package/plugins/index.js | 1 + package/plugins/zipFile.js | 22 +---- src/Pipeline/PluginSystem.js | 2 +- src/Pipeline/index.js | 3 +- src/TaskSystem/TaskGroup.js | 30 ++++--- test/Request.js | 31 ++++--- test/text-bilibili.js | 19 +++-- 11 files changed, 248 insertions(+), 214 deletions(-) create mode 100644 package/plugins/Combine.js diff --git a/dist/JSpider.esm.min.js b/dist/JSpider.esm.min.js index a4d46ed..681d458 100644 --- a/dist/JSpider.esm.min.js +++ b/dist/JSpider.esm.min.js @@ -13263,7 +13263,7 @@ function scheduled(input, scheduler) { } /** PURE_IMPORTS_START _Observable,_util_subscribeTo,_scheduled_scheduled PURE_IMPORTS_END */ -function from(input, scheduler) { +function from$1(input, scheduler) { if (!scheduler) { if (input instanceof Observable) { return input; @@ -13328,7 +13328,7 @@ function mergeMap(project, resultSelector, concurrent) { concurrent = Number.POSITIVE_INFINITY; } if (typeof resultSelector === 'function') { - return function (source) { return source.pipe(mergeMap(function (a, i) { return from(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); }, concurrent)); }; + return function (source) { return source.pipe(mergeMap(function (a, i) { return from$1(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); }, concurrent)); }; } else if (typeof resultSelector === 'number') { concurrent = resultSelector; @@ -16113,7 +16113,7 @@ function share() { /** PURE_IMPORTS_START tslib,_map,_observable_from,_innerSubscribe PURE_IMPORTS_END */ function switchMap(project, resultSelector) { if (typeof resultSelector === 'function') { - return function (source) { return source.pipe(switchMap(function (a, i) { return from(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); })); }; + return function (source) { return source.pipe(switchMap(function (a, i) { return from$1(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); })); }; } return function (source) { return source.lift(new SwitchMapOperator(project)); }; } @@ -16376,37 +16376,45 @@ class Task { * Copyright 2021 KonghaYao 江夏尧 * SPDX-License-Identifier: Apache-2.0 */ -class TaskGroup$1 extends Task { +class TaskGroup extends Task { constructor(TaskArray, spiderUUID = '00000') { - const output = new Set(TaskArray); - super(output, TaskArray?.[0]?.spiderUUID || spiderUUID); + super({}, TaskArray?.[0]?.spiderUUID || spiderUUID); + this.member = new Set(TaskArray); this.#linkTask(); } #linkTask() { - this.originData.forEach((task) => { + this.member.forEach((task) => { task._belongTo = this; task.$on('destroy', () => this.$removeLink(task)); }); } + + // 当第一次 start 的时候,返回的是完全不一样的 Task 的信息 + // 经过第一个 start 之后,所有 Task 中的数据被统一,这个标识改为 true + consume = false; // Plugin 的汇报口 $commit(type, ...payload) { - // 扩散事件 + // 扩散事件 Set 类型不能 map const result = []; - this.originData.forEach((task) => result.push(task.$commit(type, ...payload))); - const selfOutput = this.$EventHub.emit(type, ...payload); + this.member.forEach((task) => result.push(task.$commit(type, ...payload))); + this.$EventHub.emit(type, ...payload); - if (type === 'start' && this._output) return selfOutput[0]; + if (this.consume) return result[0]; + if (type === 'start') this.consume = true; return result; } // 删除所有的 link $destroy() { - const copy = [...this.originData]; - this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销 - return copy; + this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销的消息被传开 + this.$store.destroy(); + const tasks = this.member; + this.member = null; + + return tasks; } // 单独删除一个连接 $removeLink(task) { - this.originData.delete(task); + this.member.delete(task); } get [Symbol.toStringTag]() { return 'TaskGroup'; @@ -16479,7 +16487,7 @@ var staticEvent = { // 'Spider:clearFlow'(){} 'Task:success'(task) { task.$commit('complete'); - if (task instanceof TaskGroup$1) { + if (task instanceof TaskGroup) { task.$destroy(); } MessageHub.emit('ControlUpdate', { @@ -16629,7 +16637,8 @@ class Pipeline { let uuidString = ''; // ! 一次遍历实现取出 operator 和 导出 plugin init 函数的 promise 链,并延长 uuidString 用于创建 UUID const pipeline = this.Plugins.map((plugin, index) => { - uuidString += plugin.main.toString(); + // main 属性和 operator 属性必须存在一个来保证能够生成 uuid + uuidString += (plugin?.main || plugin.operator).toString(); if (plugin.init instanceof Function) { this.#PluginQueue.enQueue(plugin.init); @@ -16871,7 +16880,7 @@ class PLUGIN$1 { }); } initUUID(index) { - this.uuid = createUUID(this.main.toString() + index); + this.uuid = createUUID((this?.main || this.operator).toString() + index); } // 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流 TaskStarter(task) { @@ -16883,7 +16892,7 @@ class PLUGIN$1 { switchMap(([data, originData]) => { const result = this.main(data, originData); - return result instanceof Promise || result instanceof Observable ? from(result) : of(result); + return result instanceof Promise || result instanceof Observable ? from$1(result) : of(result); }), map((result) => { task.$commit('success', result, this.uuid, this.saveResult); @@ -17000,7 +17009,7 @@ function concurrent( filter((i) => i.length), // 无论如何每一组都会被推迟的时间量 delayWhen((_, index) => timer(index * delay)), - mergeMap((array) => from(array)), + mergeMap((array) => from$1(array)), concatMap(asyncSingle), ); } @@ -17143,7 +17152,7 @@ const aDownload = function (file) { console.log('%c 下载完成', 'color:green'); }; -const download = (data, { DownloadFileName: name }, originData) => { +const download = (data, { DownloadFileName: name } = {}, originData) => { const file = toFile(data, name || (typeof url === 'string' ? originData.url.replace(/[^\/]*?\//g, '') : '')); DownloadQueue.add(file); return null; @@ -17418,48 +17427,6 @@ async function zipper(fileArray, zipName) { return content; } -/** - * @license - * Copyright 2021 KonghaYao 江夏尧 - * SPDX-License-Identifier: Apache-2.0 - */ -class TaskGroup extends Task { - constructor(TaskArray, spiderUUID = '00000') { - const output = new Set(TaskArray); - super(output, TaskArray?.[0]?.spiderUUID || spiderUUID); - this.#linkTask(); - } - #linkTask() { - this.originData.forEach((task) => { - task._belongTo = this; - task.$on('destroy', () => this.$removeLink(task)); - }); - } - // Plugin 的汇报口 - $commit(type, ...payload) { - // 扩散事件 - const result = []; - this.originData.forEach((task) => result.push(task.$commit(type, ...payload))); - const selfOutput = this.$EventHub.emit(type, ...payload); - - if (type === 'start' && this._output) return selfOutput[0]; - return result; - } - // 删除所有的 link - $destroy() { - const copy = [...this.originData]; - this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销 - return copy; - } - // 单独删除一个连接 - $removeLink(task) { - this.originData.delete(task); - } - get [Symbol.toStringTag]() { - return 'TaskGroup'; - } -} - /** * @license * Copyright 2021 KonghaYao 江夏尧 @@ -17471,30 +17438,53 @@ const ZipFile = function (options = {}) { return Plugin$1({ init, name: 'zipFile', - main(data) { + main(blob) { const { zipFileName } = this.options; - const files = data.map((blob) => toFile(blob, zipFileName)); + const files = toFile(blob, zipFileName); return zipper(files, zipFileName); }, options, + }); +}; + +/** + * @license + * Copyright 2021 KonghaYao 江夏尧 + * SPDX-License-Identifier: Apache-2.0 + */ +function Combine(number, waitTime = 5000, options = {}) { + return Plugin$1({ + name: 'Combine', // 这个 name 是负责进行监控的标志符号 + options, // 接收所有的参数,提供给所有函数使用 operator() { // 复写 operator - const { chunk = 3 } = this.options; return (source) => source.pipe( - bufferTime(5000, undefined, chunk), + bufferTime(waitTime, undefined, number), filter((i) => i.length), // 必须要进行检测是否为空 - concatMap((tasks) => - this.TaskStarter( - new TaskGroup(tasks), - - this.uuid, - ), - ), + concatMap((tasks) => of(new TaskGroup(tasks))), ); }, }); -}; +} +function Break() { + return Plugin$1({ + name: 'Break', // 这个 name 是负责进行监控的标志符号 + options, + operator() { + return (source) => + source.pipe( + concatMap((taskGroup) => { + if (taskGroup instanceof TaskGroup) { + return from(taskGroup.$destroy()); + } else { + return of(taskGroup); + } + }), + ); + }, + }); +} /** * @license @@ -17507,7 +17497,9 @@ var plugins = /*#__PURE__*/Object.freeze({ Request: Request, Download: Download, ExcelHelper: ExcelHelper, - ZipFile: ZipFile + ZipFile: ZipFile, + Combine: Combine, + Break: Break }); /** @@ -18151,7 +18143,7 @@ class PLUGIN { }); } initUUID(index) { - this.uuid = createUUID(this.main.toString() + index); + this.uuid = createUUID((this?.main || this.operator).toString() + index); } // 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流 TaskStarter(task) { @@ -18163,7 +18155,7 @@ class PLUGIN { switchMap(([data, originData]) => { const result = this.main(data, originData); - return result instanceof Promise || result instanceof Observable ? from(result) : of(result); + return result instanceof Promise || result instanceof Observable ? from$1(result) : of(result); }), map((result) => { task.$commit('success', result, this.uuid, this.saveResult); @@ -18216,9 +18208,9 @@ var index = Object.assign(Spider, tools, { plugins, Plugin, Task, - TaskGroup: TaskGroup$1, - version: "3.2.0", - buildDate: new Date(1628303434439), + TaskGroup, + version: "3.2.1", + buildDate: new Date(1628326859357), }); export default index; diff --git a/dist/JSpider.min.js b/dist/JSpider.min.js index 519eef8..b07bff8 100644 --- a/dist/JSpider.min.js +++ b/dist/JSpider.min.js @@ -13266,7 +13266,7 @@ var JSpider = (function () { } /** PURE_IMPORTS_START _Observable,_util_subscribeTo,_scheduled_scheduled PURE_IMPORTS_END */ - function from(input, scheduler) { + function from$1(input, scheduler) { if (!scheduler) { if (input instanceof Observable) { return input; @@ -13331,7 +13331,7 @@ var JSpider = (function () { concurrent = Number.POSITIVE_INFINITY; } if (typeof resultSelector === 'function') { - return function (source) { return source.pipe(mergeMap(function (a, i) { return from(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); }, concurrent)); }; + return function (source) { return source.pipe(mergeMap(function (a, i) { return from$1(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); }, concurrent)); }; } else if (typeof resultSelector === 'number') { concurrent = resultSelector; @@ -16116,7 +16116,7 @@ var JSpider = (function () { /** PURE_IMPORTS_START tslib,_map,_observable_from,_innerSubscribe PURE_IMPORTS_END */ function switchMap(project, resultSelector) { if (typeof resultSelector === 'function') { - return function (source) { return source.pipe(switchMap(function (a, i) { return from(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); })); }; + return function (source) { return source.pipe(switchMap(function (a, i) { return from$1(project(a, i)).pipe(map(function (b, ii) { return resultSelector(a, b, i, ii); })); })); }; } return function (source) { return source.lift(new SwitchMapOperator(project)); }; } @@ -16379,37 +16379,45 @@ var JSpider = (function () { * Copyright 2021 KonghaYao 江夏尧 * SPDX-License-Identifier: Apache-2.0 */ - class TaskGroup$1 extends Task { + class TaskGroup extends Task { constructor(TaskArray, spiderUUID = '00000') { - const output = new Set(TaskArray); - super(output, TaskArray?.[0]?.spiderUUID || spiderUUID); + super({}, TaskArray?.[0]?.spiderUUID || spiderUUID); + this.member = new Set(TaskArray); this.#linkTask(); } #linkTask() { - this.originData.forEach((task) => { + this.member.forEach((task) => { task._belongTo = this; task.$on('destroy', () => this.$removeLink(task)); }); } + + // 当第一次 start 的时候,返回的是完全不一样的 Task 的信息 + // 经过第一个 start 之后,所有 Task 中的数据被统一,这个标识改为 true + consume = false; // Plugin 的汇报口 $commit(type, ...payload) { - // 扩散事件 + // 扩散事件 Set 类型不能 map const result = []; - this.originData.forEach((task) => result.push(task.$commit(type, ...payload))); - const selfOutput = this.$EventHub.emit(type, ...payload); + this.member.forEach((task) => result.push(task.$commit(type, ...payload))); + this.$EventHub.emit(type, ...payload); - if (type === 'start' && this._output) return selfOutput[0]; + if (this.consume) return result[0]; + if (type === 'start') this.consume = true; return result; } // 删除所有的 link $destroy() { - const copy = [...this.originData]; - this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销 - return copy; + this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销的消息被传开 + this.$store.destroy(); + const tasks = this.member; + this.member = null; + + return tasks; } // 单独删除一个连接 $removeLink(task) { - this.originData.delete(task); + this.member.delete(task); } get [Symbol.toStringTag]() { return 'TaskGroup'; @@ -16482,7 +16490,7 @@ var JSpider = (function () { // 'Spider:clearFlow'(){} 'Task:success'(task) { task.$commit('complete'); - if (task instanceof TaskGroup$1) { + if (task instanceof TaskGroup) { task.$destroy(); } MessageHub.emit('ControlUpdate', { @@ -16632,7 +16640,8 @@ var JSpider = (function () { let uuidString = ''; // ! 一次遍历实现取出 operator 和 导出 plugin init 函数的 promise 链,并延长 uuidString 用于创建 UUID const pipeline = this.Plugins.map((plugin, index) => { - uuidString += plugin.main.toString(); + // main 属性和 operator 属性必须存在一个来保证能够生成 uuid + uuidString += (plugin?.main || plugin.operator).toString(); if (plugin.init instanceof Function) { this.#PluginQueue.enQueue(plugin.init); @@ -16874,7 +16883,7 @@ var JSpider = (function () { }); } initUUID(index) { - this.uuid = createUUID(this.main.toString() + index); + this.uuid = createUUID((this?.main || this.operator).toString() + index); } // 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流 TaskStarter(task) { @@ -16886,7 +16895,7 @@ var JSpider = (function () { switchMap(([data, originData]) => { const result = this.main(data, originData); - return result instanceof Promise || result instanceof Observable ? from(result) : of(result); + return result instanceof Promise || result instanceof Observable ? from$1(result) : of(result); }), map((result) => { task.$commit('success', result, this.uuid, this.saveResult); @@ -17003,7 +17012,7 @@ var JSpider = (function () { filter((i) => i.length), // 无论如何每一组都会被推迟的时间量 delayWhen((_, index) => timer(index * delay)), - mergeMap((array) => from(array)), + mergeMap((array) => from$1(array)), concatMap(asyncSingle), ); } @@ -17146,7 +17155,7 @@ var JSpider = (function () { console.log('%c 下载完成', 'color:green'); }; - const download = (data, { DownloadFileName: name }, originData) => { + const download = (data, { DownloadFileName: name } = {}, originData) => { const file = toFile(data, name || (typeof url === 'string' ? originData.url.replace(/[^\/]*?\//g, '') : '')); DownloadQueue.add(file); return null; @@ -17421,48 +17430,6 @@ var JSpider = (function () { return content; } - /** - * @license - * Copyright 2021 KonghaYao 江夏尧 - * SPDX-License-Identifier: Apache-2.0 - */ - class TaskGroup extends Task { - constructor(TaskArray, spiderUUID = '00000') { - const output = new Set(TaskArray); - super(output, TaskArray?.[0]?.spiderUUID || spiderUUID); - this.#linkTask(); - } - #linkTask() { - this.originData.forEach((task) => { - task._belongTo = this; - task.$on('destroy', () => this.$removeLink(task)); - }); - } - // Plugin 的汇报口 - $commit(type, ...payload) { - // 扩散事件 - const result = []; - this.originData.forEach((task) => result.push(task.$commit(type, ...payload))); - const selfOutput = this.$EventHub.emit(type, ...payload); - - if (type === 'start' && this._output) return selfOutput[0]; - return result; - } - // 删除所有的 link - $destroy() { - const copy = [...this.originData]; - this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销 - return copy; - } - // 单独删除一个连接 - $removeLink(task) { - this.originData.delete(task); - } - get [Symbol.toStringTag]() { - return 'TaskGroup'; - } - } - /** * @license * Copyright 2021 KonghaYao 江夏尧 @@ -17474,30 +17441,53 @@ var JSpider = (function () { return Plugin$1({ init, name: 'zipFile', - main(data) { + main(blob) { const { zipFileName } = this.options; - const files = data.map((blob) => toFile(blob, zipFileName)); + const files = toFile(blob, zipFileName); return zipper(files, zipFileName); }, options, + }); + }; + + /** + * @license + * Copyright 2021 KonghaYao 江夏尧 + * SPDX-License-Identifier: Apache-2.0 + */ + function Combine(number, waitTime = 5000, options = {}) { + return Plugin$1({ + name: 'Combine', // 这个 name 是负责进行监控的标志符号 + options, // 接收所有的参数,提供给所有函数使用 operator() { // 复写 operator - const { chunk = 3 } = this.options; return (source) => source.pipe( - bufferTime(5000, undefined, chunk), + bufferTime(waitTime, undefined, number), filter((i) => i.length), // 必须要进行检测是否为空 - concatMap((tasks) => - this.TaskStarter( - new TaskGroup(tasks), - - this.uuid, - ), - ), + concatMap((tasks) => of(new TaskGroup(tasks))), ); }, }); - }; + } + function Break() { + return Plugin$1({ + name: 'Break', // 这个 name 是负责进行监控的标志符号 + options, + operator() { + return (source) => + source.pipe( + concatMap((taskGroup) => { + if (taskGroup instanceof TaskGroup) { + return from(taskGroup.$destroy()); + } else { + return of(taskGroup); + } + }), + ); + }, + }); + } /** * @license @@ -17510,7 +17500,9 @@ var JSpider = (function () { Request: Request, Download: Download, ExcelHelper: ExcelHelper, - ZipFile: ZipFile + ZipFile: ZipFile, + Combine: Combine, + Break: Break }); /** @@ -18154,7 +18146,7 @@ var JSpider = (function () { }); } initUUID(index) { - this.uuid = createUUID(this.main.toString() + index); + this.uuid = createUUID((this?.main || this.operator).toString() + index); } // 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流 TaskStarter(task) { @@ -18166,7 +18158,7 @@ var JSpider = (function () { switchMap(([data, originData]) => { const result = this.main(data, originData); - return result instanceof Promise || result instanceof Observable ? from(result) : of(result); + return result instanceof Promise || result instanceof Observable ? from$1(result) : of(result); }), map((result) => { task.$commit('success', result, this.uuid, this.saveResult); @@ -18219,9 +18211,9 @@ var JSpider = (function () { plugins, Plugin, Task, - TaskGroup: TaskGroup$1, - version: "3.2.0", - buildDate: new Date(1628303434439), + TaskGroup, + version: "3.2.1", + buildDate: new Date(1628326859357), }); return index; diff --git a/package/plugins/Combine.js b/package/plugins/Combine.js new file mode 100644 index 0000000..68af911 --- /dev/null +++ b/package/plugins/Combine.js @@ -0,0 +1,44 @@ +/** + * @license + * Copyright 2021 KonghaYao 江夏尧 + * SPDX-License-Identifier: Apache-2.0 + */ +/* eslint-disable no-invalid-this */ + +import { Plugin } from '@src/Pipeline/PluginSystem.js'; +import { of } from 'rxjs'; +import { bufferTime, concatMap, filter } from 'rxjs/operators'; +import { TaskGroup } from '../../src/TaskSystem/TaskGroup'; +export function Combine(number, waitTime = 5000, options = {}) { + return Plugin({ + name: 'Combine', // 这个 name 是负责进行监控的标志符号 + options, // 接收所有的参数,提供给所有函数使用 + operator() { + // 复写 operator + return (source) => + source.pipe( + bufferTime(waitTime, undefined, number), + filter((i) => i.length), // 必须要进行检测是否为空 + concatMap((tasks) => of(new TaskGroup(tasks))), + ); + }, + }); +} +export function Break() { + return Plugin({ + name: 'Break', // 这个 name 是负责进行监控的标志符号 + options, + operator() { + return (source) => + source.pipe( + concatMap((taskGroup) => { + if (taskGroup instanceof TaskGroup) { + return from(taskGroup.$destroy()); + } else { + return of(taskGroup); + } + }), + ); + }, + }); +} diff --git a/package/plugins/Download.js b/package/plugins/Download.js index 7dbfb2b..369388c 100644 --- a/package/plugins/Download.js +++ b/package/plugins/Download.js @@ -25,7 +25,7 @@ const aDownload = function (file) { console.log('%c 下载完成', 'color:green'); }; -const download = (data, { DownloadFileName: name }, originData) => { +const download = (data, { DownloadFileName: name } = {}, originData) => { const file = toFile(data, name || (typeof url === 'string' ? originData.url.replace(/[^\/]*?\//g, '') : '')); DownloadQueue.add(file); return null; diff --git a/package/plugins/index.js b/package/plugins/index.js index 288e4ef..a34334e 100644 --- a/package/plugins/index.js +++ b/package/plugins/index.js @@ -7,3 +7,4 @@ export * from './Request.js'; export * from './Download.js'; export * from './ExcelHelper.js'; export * from './zipFile.js'; +export * from './Combine.js'; diff --git a/package/plugins/zipFile.js b/package/plugins/zipFile.js index 54c73c2..81aea31 100644 --- a/package/plugins/zipFile.js +++ b/package/plugins/zipFile.js @@ -3,7 +3,6 @@ * Copyright 2021 KonghaYao 江夏尧 * SPDX-License-Identifier: Apache-2.0 */ -import { bufferTime, concatMap, filter } from 'rxjs/operators'; import { Plugin } from '@src/Pipeline/PluginSystem.js'; @@ -11,34 +10,17 @@ import { init } from './JSzip/JSzip.js'; import { zipper } from './JSzip/zipper.js'; import { toFile } from './utils/toFile.js'; -import { TaskGroup } from '@src/TaskSystem/TaskGroup.js'; export const ZipFile = function (options = {}) { if (!options.zipFileName) options.zipFileName = new Date().getTime(); return Plugin({ init, name: 'zipFile', - main(data) { + main(blob) { const { zipFileName } = this.options; - const files = data.map((blob) => toFile(blob, zipFileName)); + const files = toFile(blob, zipFileName); return zipper(files, zipFileName); }, options, - operator() { - // 复写 operator - const { chunk = 3 } = this.options; - return (source) => - source.pipe( - bufferTime(5000, undefined, chunk), - filter((i) => i.length), // 必须要进行检测是否为空 - concatMap((tasks) => - this.TaskStarter( - new TaskGroup(tasks), - - this.uuid, - ), - ), - ); - }, }); }; diff --git a/src/Pipeline/PluginSystem.js b/src/Pipeline/PluginSystem.js index 24a3eb3..fb2ffc6 100644 --- a/src/Pipeline/PluginSystem.js +++ b/src/Pipeline/PluginSystem.js @@ -56,7 +56,7 @@ class PLUGIN { }); } initUUID(index) { - this.uuid = createUUID(this.main.toString() + index); + this.uuid = createUUID((this?.main || this.operator).toString() + index); } // 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流 TaskStarter(task) { diff --git a/src/Pipeline/index.js b/src/Pipeline/index.js index 612c09f..491d406 100644 --- a/src/Pipeline/index.js +++ b/src/Pipeline/index.js @@ -20,7 +20,8 @@ export class Pipeline { let uuidString = ''; // ! 一次遍历实现取出 operator 和 导出 plugin init 函数的 promise 链,并延长 uuidString 用于创建 UUID const pipeline = this.Plugins.map((plugin, index) => { - uuidString += plugin.main.toString(); + // main 属性和 operator 属性必须存在一个来保证能够生成 uuid + uuidString += (plugin?.main || plugin.operator).toString(); if (plugin.init instanceof Function) { this.#PluginQueue.enQueue(plugin.init); diff --git a/src/TaskSystem/TaskGroup.js b/src/TaskSystem/TaskGroup.js index 1fa0d4d..2aa692c 100644 --- a/src/TaskSystem/TaskGroup.js +++ b/src/TaskSystem/TaskGroup.js @@ -6,35 +6,43 @@ import { Task } from './Task'; export class TaskGroup extends Task { constructor(TaskArray, spiderUUID = '00000') { - const output = new Set(TaskArray); - super(output, TaskArray?.[0]?.spiderUUID || spiderUUID); + super({}, TaskArray?.[0]?.spiderUUID || spiderUUID); + this.member = new Set(TaskArray); this.#linkTask(); } #linkTask() { - this.originData.forEach((task) => { + this.member.forEach((task) => { task._belongTo = this; task.$on('destroy', () => this.$removeLink(task)); }); } + + // 当第一次 start 的时候,返回的是完全不一样的 Task 的信息 + // 经过第一个 start 之后,所有 Task 中的数据被统一,这个标识改为 true + consume = false; // Plugin 的汇报口 $commit(type, ...payload) { - // 扩散事件 + // 扩散事件 Set 类型不能 map const result = []; - this.originData.forEach((task) => result.push(task.$commit(type, ...payload))); - const selfOutput = this.$EventHub.emit(type, ...payload); + this.member.forEach((task) => result.push(task.$commit(type, ...payload))); + this.$EventHub.emit(type, ...payload); - if (type === 'start' && this._output) return selfOutput[0]; + if (this.consume) return result[0]; + if (type === 'start') this.consume = true; return result; } // 删除所有的 link $destroy() { - const copy = [...this.originData]; - this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销 - return copy; + this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销的消息被传开 + this.$store.destroy(); + const tasks = this.member; + this.member = null; + + return tasks; } // 单独删除一个连接 $removeLink(task) { - this.originData.delete(task); + this.member.delete(task); } get [Symbol.toStringTag]() { return 'TaskGroup'; diff --git a/test/Request.js b/test/Request.js index 9ed9720..93dd291 100644 --- a/test/Request.js +++ b/test/Request.js @@ -5,6 +5,7 @@ const { Download, // 下载库 ExcelHelper, // 转换数据为表格数据的插件 ZipFile, + Combine, } = JSpider.plugins; const { Plugin } = JSpider; @@ -19,20 +20,28 @@ export async function main() { logEvery: true, }); spider.pipeline( - Request({ delay: 2000, buffer: 2, retry: 3, handleError: null }), + Request({ delay: 100, buffer: 2, retry: 3, handleError: null }), - // ExcelHelper({ - // XLSXOptions: { - // bookType: 'csv', // 可以指定为 csv 或者 xlsx - // }, - // }), + Combine(3, 1000), + ExcelHelper( + (data) => { + return { + [new Date().getTime()]: data + .map((i) => i.data) + .flat() + .flat() + .flat(), + }; + }, + { + XLSXOptions: { + bookType: 'csv', // 可以指定为 csv 或者 xlsx + }, + }, + ), // ZipFile({ chunk: 2 }), - Plugin((data) => { - console.log(data); - return data; - }), - // Download(), + Download(), ); spider.crawl(urls); spider.start(); diff --git a/test/text-bilibili.js b/test/text-bilibili.js index b7efbc8..6927832 100644 --- a/test/text-bilibili.js +++ b/test/text-bilibili.js @@ -1,9 +1,11 @@ -import('https://cdn.jsdelivr.net/npm/js-spider@3.2/dist/JSpider.esm.min.js').then(async (res) => { +import('https://cdn.jsdelivr.net/npm/js-spider@3.2.1/dist/JSpider.esm.min.js').then(async (res) => { let JSpider = res.default; let { Plugin, - plugins: { ExcelHelper, Request, Download }, + plugins: { ExcelHelper, Request, Download, ZipFile }, } = JSpider; + await JSpider.$load('xlsx'); + await JSpider.$load('jszip'); let keyword = '奥运'; let first = await fetch( `https://api.bilibili.com/x/web-interface/search/type?context=&page=1&order=&keyword=${keyword}&duration=&tids_1=&tids_2=&from_source=&from_spmid=333.336&platform=pc&__refresh__=true&_extra=&search_type=video&highlight=1&single_column=0`, @@ -17,17 +19,20 @@ import('https://cdn.jsdelivr.net/npm/js-spider@3.2/dist/JSpider.esm.min.js').the url: `https://api.bilibili.com/x/web-interface/search/type?context=&page=${ i + 1 }&order=&keyword=${keyword}&duration=&tids_1=&tids_2=&from_source=&from_spmid=333.336&platform=pc&__refresh__=true&_extra=&search_type=video&highlight=1&single_column=0`, - options: { referrer: 'https://search.bilibili.com/all?keyword=%E5%A5%A5%E8%BF%90', method: 'GET' }, + options: { + referrer: 'https://search.bilibili.com/all?keyword=%E5%A5%A5%E8%BF%90', + method: 'GET', + }, }; }); - + window.Result = []; let spider = new JSpider() .pipeline( Request(), - ExcelHelper((data) => { - return data.data.result; + Plugin((data) => { + data.data.result.forEach((item) => ['hit_columns', 'new_rec_tags'].forEach((ii) => (item[ii] = ''))); + window.Result.push(data.data.result); }), - Download(), ) .crawl(urls) .start();