From fa354641d8477255524df3342702965272e80106 Mon Sep 17 00:00:00 2001 From: KonghaYao <20192831006@m.scnu.edu.cn> Date: Mon, 19 Jul 2021 11:29:14 +0800 Subject: [PATCH] =?UTF-8?q?feat(controlpanel):=20control=20Panel=20?= =?UTF-8?q?=E4=B8=AD=E7=9A=84=E6=9A=82=E5=81=9C=E6=B5=81=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/ControlPanel/ControlPanel.js | 26 +++++++++++++------- src/ControlPanel/EventHub.js | 22 ++++++++++------- src/ControlPanel/StaticEvent.js | 6 +++++ src/Spider/index.js | 16 +++++++++---- src/utils/functionQueue.js | 1 + src/utils/pauseWhile.js | 15 ++++++++++++ test/test.js | 41 ++++++++++++++++++++++---------- 7 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 src/utils/pauseWhile.js diff --git a/src/ControlPanel/ControlPanel.js b/src/ControlPanel/ControlPanel.js index a217c02..6ab6e24 100644 --- a/src/ControlPanel/ControlPanel.js +++ b/src/ControlPanel/ControlPanel.js @@ -2,7 +2,8 @@ import staticEvent from './StaticEvent'; import { TaskManager } from './TaskManager'; import { functionQueue } from '../utils/functionQueue'; import { EventHub } from './EventHub'; -import { takeUntil } from 'rxjs/operators'; +import { pauseWhile } from '../utils/pauseWhile'; + // ControlPanel 是 JSpider 内部的事件和数据中心。 // 全部 JSpider 涉及到的边界中,ControlPanel 只有一个,但是 View 可以有多个,而 Spider 就是 View 中的一个 // 用于分发数据流,提供 Task 的状态变更。 @@ -10,19 +11,21 @@ import { takeUntil } from 'rxjs/operators'; export class ControlPanel { state = 'free'; // 'free' 'preparing' - #runningQueue = new functionQueue(); // 准备阶段的 Queue 队列 - #stopFlow = null; + #runningQueue = new functionQueue(); // init 阶段的 Queue 队列 + #stop = false; // 用于直接切断 spiderSource$ 的流 + spiderSource$ = null; _pipeline = null; TaskManager = new TaskManager(); constructor() { this.$EventHub = new EventHub(staticEvent, this); - this.#stopFlow = this.$EventHub.createSource$('stopFlow'); + this.#stopFlow = this.$EventHub.createSource$('Flow:stop'); } // 这是一个完整的流控 _createLogicLine() { - this.spiderSource$ = this.$EventHub - .createSource$('runPipeline') - .pipe(this._pipeline.operator, takeUntil(this.#stopFlow)); + this.spiderSource$ = this.$EventHub.createSource$('Flow:start').pipe( + this._pipeline.operator, + pauseWhile(() => this.#stop), + ); this.spiderSource$.subscribe( // 所有的事件分配到 staticEvent 中去写 (task) => this.$EventHub.emit('Task:success', task), @@ -50,12 +53,17 @@ export class ControlPanel { // startInfo --TaskManager.createTask--> Task --emit--EventHub--> Flow createFlow(infos) { - this.#runningQueue.enQueue(() => { + this.$EventHub.emit('Flow:start'); + return this.#runningQueue.enQueue(() => { infos.forEach((info) => { if (!this._pipeline) throw new Error('没有创建pipeline'); const task = this.TaskManager.createTask(info, this._pipeline.UUID); - this.$EventHub.emit('runPipeline', task); + this.$EventHub.emit('Flow:start', task); }); }); } + // TEST 测试功能 + stopFlow() { + this.$EventHub.emit('Flow:stop'); + } } diff --git a/src/ControlPanel/EventHub.js b/src/ControlPanel/EventHub.js index 5f19505..2e3b47e 100644 --- a/src/ControlPanel/EventHub.js +++ b/src/ControlPanel/EventHub.js @@ -1,10 +1,18 @@ import { fromEventPattern } from 'rxjs'; - +import { memoize } from 'lodash-es'; export class EventHub { all = new Map(); constructor(eventMap = {}, bindThis = null) { this.bindThis = bindThis || globalThis; this.on(eventMap); + + // 创建一个 rxjs 流源头 + this.createSource$ = memoize((eventName) => { + return fromEventPattern( + (handle) => this.on(eventName, handle), + (handle) => this.off(eventName, handle), + ); + }); } #on(type, handler) { const handlers = this.all.get(type); @@ -46,11 +54,9 @@ export class EventHub { }) : []; } - // 创建一个 rxjs 流源头 - createSource$(eventName) { - return fromEventPattern( - (handle) => this.on(eventName, handle), - (handle) => this.off(eventName, handle), - ); - } + + operators = { + // TODO EventHub 中对于 rxjs 流的支持 + EmitWhen(config) {}, + }; } diff --git a/src/ControlPanel/StaticEvent.js b/src/ControlPanel/StaticEvent.js index f98eb33..8c4c77b 100644 --- a/src/ControlPanel/StaticEvent.js +++ b/src/ControlPanel/StaticEvent.js @@ -5,6 +5,12 @@ export default { stateChange(state) { this.state = state; }, + 'Flow:stop'() { + this.#stop = true; + }, + 'Flow:start'() { + this.#stop = false; + }, // runPipeline() {}, // 这个函数没有必要监听是因为 rxjs 代理了这个事件 'Task:success'(task) {}, 'Task:error'(error) { diff --git a/src/Spider/index.js b/src/Spider/index.js index 50b52df..764d546 100644 --- a/src/Spider/index.js +++ b/src/Spider/index.js @@ -2,7 +2,7 @@ * @Author: KonghaYao * @Date: 2021-07-13 15:33:08 * @Last Modified by: KonghaYao - * @Last Modified time: 2021-07-18 09:43:40 + * @Last Modified time: 2021-07-19 11:26:55 */ // Spider 是 JSpider 的爬虫流程工具,主要目的是爬取文件,负责如何爬取文件与数据。 import { EventHub } from '../ControlPanel/EventHub.js'; @@ -12,20 +12,26 @@ import ControlPanel from '../ControlPanel/index.js'; // TODO 未完成接口的接入 export class Spider { - infoList = []; + infoList = new WeakSet(); // Client 端的缓存数据 constructor(config = {}) { this.config = config; this.EventHub = new EventHub(staticEvent, this); // 注册静态事件 } crawl(...args) { - this.infoList.push(...args.flat()); + this.infoList.set(...args.flat()); return this; } pipeline(...plugins) { ControlPanel.pipeline = new Pipeline(plugins); } start() { - ControlPanel.createFlow(this.infoList); + ControlPanel.createFlow(this.infoList).enQueue(() => { + // 清除缓存 + this.infoList.clear(); + this.infoList = new WeakSet(); + }); + } + stop() { + ControlPanel.stopFlow(); } - stop() {} } diff --git a/src/utils/functionQueue.js b/src/utils/functionQueue.js index 08b9523..4951d88 100644 --- a/src/utils/functionQueue.js +++ b/src/utils/functionQueue.js @@ -10,5 +10,6 @@ export class functionQueue { this.QueuePromise = args.reduce((promise, current) => { return promise.then(current); }, this.QueuePromise); + return this; } } diff --git a/src/utils/pauseWhile.js b/src/utils/pauseWhile.js new file mode 100644 index 0000000..3ecb8f5 --- /dev/null +++ b/src/utils/pauseWhile.js @@ -0,0 +1,15 @@ +export const pauseWhile = (pauseFunc) => { + const cache = []; + return (source) => + source.pipe( + switchMap((value) => { + if (pauseFunc(value)) { + cache.push(value); + console.log('save ', value); + return EMPTY; + } else { + return from([...cache, value]).pipe(tap(() => (cache.length = 0))); + } + }), + ); +}; diff --git a/test/test.js b/test/test.js index 4711778..4c6031d 100644 --- a/test/test.js +++ b/test/test.js @@ -1,4 +1,5 @@ const mitt = require('mitt'); +const { iif } = require('rxjs'); const { EMPTY } = require('rxjs'); const { of, from, pipe, fromEventPattern, timer, interval } = require('rxjs'); const { @@ -20,19 +21,33 @@ const { bufferToggle, bufferTime, takeUntil, + switchMapTo, + exhaustMap, } = require('rxjs/operators'); const emitter = mitt(); -const lastTime = 0; -const stopFlow = fromEventPattern((handle) => emitter.on('stopFlow', handle)); -const source$ = fromEventPattern((handle) => emitter.on('start', handle)).pipe( - bufferTime(1000, undefined, 3), - takeUntil(stopFlow), -); - -source$.subscribe((i) => console.log(new Date().getTime(), i)); +const pause$ = fromEventPattern((handle) => emitter.on('pause', handle)); +const pauseWhile = (pauseFunc) => { + const cache = []; + return (source) => + source.pipe( + switchMap((value) => { + if (pauseFunc(value)) { + cache.push(value); + console.log('save ', value); + return EMPTY; + } else { + return from([...cache, value]).pipe(tap(() => (cache.length = 0))); + } + }), + ); +}; -[...Array(10).keys()].forEach((i) => emitter.emit('start', i * 100)); - -setTimeout(() => { - emitter.emit('stopFlow'); -}, 4000); +const source$ = interval(300).pipe( + pauseWhile((val) => { + return val % 5; + }), + take(14), +); +const startTime = new Date().getTime(); +const compare = () => Math.ceil((new Date().getTime() - startTime) / 100); +source$.subscribe((val) => console.log(compare(), val));