Skip to content

Commit

Permalink
feat(controlpanel): control Panel 中的暂停流功能的实现
Browse files Browse the repository at this point in the history
  • Loading branch information
KonghaYao committed Jul 19, 2021
1 parent 0fd690e commit fa35464
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 35 deletions.
26 changes: 17 additions & 9 deletions src/ControlPanel/ControlPanel.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,30 @@ import staticEvent from './StaticEvent';
import { TaskManager } from './TaskManager';
import { functionQueue } from '../utils/functionQueue';
import { EventHub } from './EventHub';
import { takeUntil } from 'rxjs/operators';
import { pauseWhile } from '../utils/pauseWhile';

// ControlPanel 是 JSpider 内部的事件和数据中心。
// 全部 JSpider 涉及到的边界中,ControlPanel 只有一个,但是 View 可以有多个,而 Spider 就是 View 中的一个
// 用于分发数据流,提供 Task 的状态变更。
// TODO 并且可以提供数据的响应给类似于 UI 界面形成可视化

export class ControlPanel {
state = 'free'; // 'free' 'preparing'
#runningQueue = new functionQueue(); // 准备阶段的 Queue 队列
#stopFlow = null;
#runningQueue = new functionQueue(); // init 阶段的 Queue 队列
#stop = false; // 用于直接切断 spiderSource$ 的流
spiderSource$ = null;
_pipeline = null;
TaskManager = new TaskManager();
constructor() {
this.$EventHub = new EventHub(staticEvent, this);
this.#stopFlow = this.$EventHub.createSource$('stopFlow');
this.#stopFlow = this.$EventHub.createSource$('Flow:stop');
}
// 这是一个完整的流控
_createLogicLine() {
this.spiderSource$ = this.$EventHub
.createSource$('runPipeline')
.pipe(this._pipeline.operator, takeUntil(this.#stopFlow));
this.spiderSource$ = this.$EventHub.createSource$('Flow:start').pipe(
this._pipeline.operator,
pauseWhile(() => this.#stop),
);
this.spiderSource$.subscribe(
// 所有的事件分配到 staticEvent 中去写
(task) => this.$EventHub.emit('Task:success', task),
Expand Down Expand Up @@ -50,12 +53,17 @@ export class ControlPanel {

// startInfo --TaskManager.createTask--> Task --emit--EventHub--> Flow
createFlow(infos) {
this.#runningQueue.enQueue(() => {
this.$EventHub.emit('Flow:start');
return this.#runningQueue.enQueue(() => {
infos.forEach((info) => {
if (!this._pipeline) throw new Error('没有创建pipeline');
const task = this.TaskManager.createTask(info, this._pipeline.UUID);
this.$EventHub.emit('runPipeline', task);
this.$EventHub.emit('Flow:start', task);
});
});
}
// TEST 测试功能
stopFlow() {
this.$EventHub.emit('Flow:stop');
}
}
22 changes: 14 additions & 8 deletions src/ControlPanel/EventHub.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
import { fromEventPattern } from 'rxjs';

import { memoize } from 'lodash-es';
export class EventHub {
all = new Map();
constructor(eventMap = {}, bindThis = null) {
this.bindThis = bindThis || globalThis;
this.on(eventMap);

// 创建一个 rxjs 流源头
this.createSource$ = memoize((eventName) => {
return fromEventPattern(
(handle) => this.on(eventName, handle),
(handle) => this.off(eventName, handle),
);
});
}
#on(type, handler) {
const handlers = this.all.get(type);
Expand Down Expand Up @@ -46,11 +54,9 @@ export class EventHub {
})
: [];
}
// 创建一个 rxjs 流源头
createSource$(eventName) {
return fromEventPattern(
(handle) => this.on(eventName, handle),
(handle) => this.off(eventName, handle),
);
}

operators = {
// TODO EventHub 中对于 rxjs 流的支持
EmitWhen(config) {},
};
}
6 changes: 6 additions & 0 deletions src/ControlPanel/StaticEvent.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ export default {
stateChange(state) {
this.state = state;
},
'Flow:stop'() {
this.#stop = true;
},
'Flow:start'() {
this.#stop = false;
},
// runPipeline() {}, // 这个函数没有必要监听是因为 rxjs 代理了这个事件
'Task:success'(task) {},
'Task:error'(error) {
Expand Down
16 changes: 11 additions & 5 deletions src/Spider/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* @Author: KonghaYao
* @Date: 2021-07-13 15:33:08
* @Last Modified by: KonghaYao
* @Last Modified time: 2021-07-18 09:43:40
* @Last Modified time: 2021-07-19 11:26:55
*/
// Spider 是 JSpider 的爬虫流程工具,主要目的是爬取文件,负责如何爬取文件与数据。
import { EventHub } from '../ControlPanel/EventHub.js';
Expand All @@ -12,20 +12,26 @@ import ControlPanel from '../ControlPanel/index.js';

// TODO 未完成接口的接入
export class Spider {
infoList = [];
infoList = new WeakSet(); // Client 端的缓存数据
constructor(config = {}) {
this.config = config;
this.EventHub = new EventHub(staticEvent, this); // 注册静态事件
}
crawl(...args) {
this.infoList.push(...args.flat());
this.infoList.set(...args.flat());
return this;
}
pipeline(...plugins) {
ControlPanel.pipeline = new Pipeline(plugins);
}
start() {
ControlPanel.createFlow(this.infoList);
ControlPanel.createFlow(this.infoList).enQueue(() => {
// 清除缓存
this.infoList.clear();
this.infoList = new WeakSet();
});
}
stop() {
ControlPanel.stopFlow();
}
stop() {}
}
1 change: 1 addition & 0 deletions src/utils/functionQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ export class functionQueue {
this.QueuePromise = args.reduce((promise, current) => {
return promise.then(current);
}, this.QueuePromise);
return this;
}
}
15 changes: 15 additions & 0 deletions src/utils/pauseWhile.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export const pauseWhile = (pauseFunc) => {
const cache = [];
return (source) =>
source.pipe(
switchMap((value) => {
if (pauseFunc(value)) {
cache.push(value);
console.log('save ', value);
return EMPTY;
} else {
return from([...cache, value]).pipe(tap(() => (cache.length = 0)));
}
}),
);
};
41 changes: 28 additions & 13 deletions test/test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const mitt = require('mitt');
const { iif } = require('rxjs');
const { EMPTY } = require('rxjs');
const { of, from, pipe, fromEventPattern, timer, interval } = require('rxjs');
const {
Expand All @@ -20,19 +21,33 @@ const {
bufferToggle,
bufferTime,
takeUntil,
switchMapTo,
exhaustMap,
} = require('rxjs/operators');
const emitter = mitt();
const lastTime = 0;
const stopFlow = fromEventPattern((handle) => emitter.on('stopFlow', handle));
const source$ = fromEventPattern((handle) => emitter.on('start', handle)).pipe(
bufferTime(1000, undefined, 3),
takeUntil(stopFlow),
);

source$.subscribe((i) => console.log(new Date().getTime(), i));
const pause$ = fromEventPattern((handle) => emitter.on('pause', handle));
const pauseWhile = (pauseFunc) => {
const cache = [];
return (source) =>
source.pipe(
switchMap((value) => {
if (pauseFunc(value)) {
cache.push(value);
console.log('save ', value);
return EMPTY;
} else {
return from([...cache, value]).pipe(tap(() => (cache.length = 0)));
}
}),
);
};

[...Array(10).keys()].forEach((i) => emitter.emit('start', i * 100));

setTimeout(() => {
emitter.emit('stopFlow');
}, 4000);
const source$ = interval(300).pipe(
pauseWhile((val) => {
return val % 5;
}),
take(14),
);
const startTime = new Date().getTime();
const compare = () => Math.ceil((new Date().getTime() - startTime) / 100);
source$.subscribe((val) => console.log(compare(), val));

0 comments on commit fa35464

Please sign in to comment.