Skip to content

Commit

Permalink
feat(mirror): 数据放送接口
Browse files Browse the repository at this point in the history
  • Loading branch information
KonghaYao committed Aug 2, 2021
1 parent 0e17d63 commit 8ea3697
Show file tree
Hide file tree
Showing 19 changed files with 82 additions and 37,353 deletions.
18,632 changes: 0 additions & 18,632 deletions dist/JSpider.esm.min.js

Large diffs are not rendered by default.

18,637 changes: 0 additions & 18,637 deletions dist/JSpider.min.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions plugins/Download.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const download = (data, { DownloadFileName: name }, originData) => {
};
export const Download = function (options = {}) {
return Plugin({
name: 'Download',
main: download,
options,
});
Expand Down
1 change: 1 addition & 0 deletions plugins/ExcelHelper.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { init } from './ExcelHelper/xlsx.js';
// ExcelHelper 是将 Object => Book => File 用于下载的一个库
export const ExcelHelper = function (formatter, options = {}) {
return Plugin({
name: 'ExcelHelper',
init,
options,
main(data) {
Expand Down
1 change: 1 addition & 0 deletions plugins/HTMLParser.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const Parser = function (string, func, documentType = 'text/html') {
};
export function HTMLParser(callback, options = {}) {
return Plugin({
name: 'HTMLParser',
options,
main(data) {
const { formatter, documentType } = this.options;
Expand Down
1 change: 1 addition & 0 deletions plugins/Request.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function HandleError(err) {
export function Request(options = {}) {
return Plugin({
init() {}, // 在所有工作开始前会启动的函数,可以用于 Promise 加载一些 js 插件
name: 'Request', // 这个 name 是负责进行监控的标志符号
main: request, // 功能性的核心函数
options, // 接收所有的参数,提供给所有函数使用

Expand Down
1 change: 1 addition & 0 deletions plugins/zipFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export const ZipFile = function (options = {}) {
if (!options.zipFileName) options.zipFileName = new Date().getTime();
return Plugin({
init,
name: 'zipFile',
main(data) {
const { zipFileName } = this.options;
const files = data.map((blob) => toFile(blob, zipFileName));
Expand Down
13 changes: 0 additions & 13 deletions src/ControlPanel/Mirror.js

This file was deleted.

21 changes: 11 additions & 10 deletions src/ControlPanel/TaskManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@
* Copyright 2021 KonghaYao 江夏尧 <dongzhongzhidong@qq.com>
* SPDX-License-Identifier: Apache-2.0
*/
/* eslint-disable no-invalid-this */
import { Task } from '../TaskSystem/Task';

function createTaskViewModel(task) {
// TODO 建立 Proxy 对象提供监控
return new Proxy(task, {});
}
import { MessageHub } from '../Mirror/Mirror.js';

// ! 用于维护全局 Task 数据的中心
export class TaskManager {
Expand All @@ -19,16 +16,20 @@ export class TaskManager {
// 这好比村长只管理村民人数和状况,但是不管理村民的组织
createTask(data, pipelineUUID) {
const task = new Task(data, pipelineUUID);
this.#Tasks.set(task._uuid, task);
this.#Tasks.set(task.uuid, task);
const that = this;
task.$on({
// 监听事件,并更新响应的 viewModel
destroy() {
this.#Tasks.delete(this._uuid); // this 绑定的是 task
that.#Tasks.delete(this.uuid); // this 绑定的是 task
},
});

const model = createTaskViewModel(task);
this.viewModel.push(model);
['start', 'success', 'complete', 'error'].forEach((name) => {
task.$on(name, function () {
MessageHub.emit('TaskUpdate', this.$store.$backup());
});
});
this.viewModel.push(task.$store);
return task;
}
}
21 changes: 21 additions & 0 deletions src/Mirror/Mirror.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* @license
* Copyright 2021 KonghaYao 江夏尧 <dongzhongzhidong@qq.com>
* SPDX-License-Identifier: Apache-2.0
*/
import { EventHub } from '../utils/EventHub';

/**
* Mirror 是 JSpider 中的数据外放接口
*
* MessageHub 是集合所有事件的事件中心,
* 当 MessageHub 被 emit 时, MessageHub 发出相关的 Update 进行视图的更新
* 所有的外放接口使用 rxjs 进行 subscribe
*/

const MessageHub = new EventHub();

// Task 数据发生改变时
const TaskUpdate = MessageHub.createSource$('TaskUpdate');

export { MessageHub, TaskUpdate };
8 changes: 4 additions & 4 deletions src/Pipeline/PluginSystem.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ class PLUGIN {
options = {},
operator,
}) {
const uuid = createUUID(main.toString());
if (operator) this.operator = operator;
// 写入自身中
Object.assign(this, {
name: name || uuid, // 名称,一般用作提示标记
uuid, // 唯一标识
name, // 名称,一般用作提示标记
main, // Plugin 中的功能性函数
init, // 初始化整个 Plugin 的函数
error, // 函数错误时的事件
Expand All @@ -57,7 +55,9 @@ class PLUGIN {
forceRetry, // 是否强制重新使用 Plugin
});
}

initUUID(index) {
this.uuid = createUUID(this.main.toString() + index);
}
// 对 main 函数外包一层,直接启动 main 函数的执行,返回一条流
TaskStarter(task) {
return of(task).pipe(
Expand Down
14 changes: 11 additions & 3 deletions src/Pipeline/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,26 @@ export class Pipeline {
this.Plugins = Plugins;
}
UUID = null; // 唯一的标识
operator = null; // 主要被引用的 operator
operator = null; // 组合起来的最终的 operator
pluginUUIDMap = new Map();

#PluginQueue = new functionQueue(); // 准备 Plugin 中的异步 init 事件
preparePipeline() {
let uuidString = '';
// ! 一次遍历实现取出 operator 和 导出 plugin init 函数的 promise 链,并延长 uuidString 用于创建 UUID
const pipeline = this.Plugins.map((plugin) => {
uuidString += plugin.operator.toString();
const pipeline = this.Plugins.map((plugin, index) => {
uuidString += plugin.main.toString();

if (plugin.init instanceof Function) {
this.#PluginQueue.enQueue(plugin.init);
}

// 需要注入 index 表示这个程序的位置
plugin.initUUID(index);

// 保存 uuid 的映射
this.pluginUUIDMap.set(plugin.uuid, plugin?.name || plugin.uuid);

// 将 plugin 中的 operator 注入 pipeline 中
return plugin.operator(this);
});
Expand Down
17 changes: 11 additions & 6 deletions src/Spider/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@
* @Last Modified time: 2021-07-19 16:34:32
*/
// Spider 是 JSpider 的爬虫流程工具,主要目的是爬取文件,负责如何爬取文件与数据。
import { EventHub } from '../utils/EventHub.js';
import { staticEvent } from './staticEvent';

import { Pipeline } from '../Pipeline/index';
import ControlPanel from '../ControlPanel/index.js';
import { TaskUpdate } from '../Mirror/Mirror.js';

// TODO 未完成接口的接入
// Spider 是 Console 的数据放送
export class Spider {
constructor(config = {}) {
this.config = config;
this.$EventHub = new EventHub(staticEvent, this); // 注册静态事件
constructor({ logEvery = false } = {}) {
this.config = {
logEvery,
};
if (logEvery)
TaskUpdate.subscribe((data) => {
console.log(data);
});
}
crawl(...args) {
ControlPanel.createFlow(args.flat());
Expand Down
6 changes: 0 additions & 6 deletions src/Spider/staticEvent.js

This file was deleted.

6 changes: 4 additions & 2 deletions src/TaskSystem/Task.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export class Task {

// Plugin 的汇报口
$commit(type, ...payload) {
// 遵循内先外后的函数触发

const result = this.$store[type](...payload);
this.$EventHub.emit(type, ...payload);
return result;
Expand All @@ -30,8 +32,8 @@ export class Task {
}
$destroy() {
this._belongTo = null;
this.$off('*');
this.$commit('destroy'); // 通知外部,该 Task 被销毁
this.$commit('destroy'); // 先通知外部,该 Task 被销毁
this.$EventHub.off('*'); // 后进行自身销毁
}
get [Symbol.toStringTag]() {
return 'Task';
Expand Down
3 changes: 1 addition & 2 deletions src/TaskSystem/TaskGroup.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ export class TaskGroup extends Task {
// 删除所有的 link
$destroy() {
const copy = [...this._originData];
// 不进行事件的扩散
this.$EventHub.emit('destroy');
this.$EventHub.emit('destroy'); // 不进行事件的扩散, 只是自身的报销
return copy;
}
// 单独删除一个连接
Expand Down
5 changes: 5 additions & 0 deletions src/TaskSystem/TaskTypes.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/**
* @license
* Copyright 2021 KonghaYao 江夏尧 <dongzhongzhidong@qq.com>
* SPDX-License-Identifier: Apache-2.0
*/
import { types } from 'mobx-state-tree';
const TaskState = types.enumeration('TaskState', ['free', 'pending', 'complete', 'error', 'destroyed']);

Expand Down
4 changes: 2 additions & 2 deletions src/utils/EventHub.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
import { fromEventPattern } from 'rxjs';
import { memoize } from 'lodash-es';
import { share } from 'rxjs/operators';

/**
* EventHub 是一个事件处理中心,用于事件的接收与派发
Expand All @@ -21,7 +22,7 @@ export class EventHub {
return fromEventPattern(
(handle) => this.on(eventName, handle),
(handle) => this.off(eventName, handle),
);
).pipe(share());
});
}

Expand Down Expand Up @@ -72,7 +73,6 @@ export class EventHub {
}
}
emit(type, ...eventParams) {
console.log(type);
const handlers = this.all.get(type);
return handlers
? handlers.map((handler) => {
Expand Down
43 changes: 7 additions & 36 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,43 +23,14 @@ import {
switchMapTo,
exhaustMap,
} from 'rxjs/operators';
const emitter = mitt();
const pause$ = fromEventPattern((handle) => emitter.on('pause', handle));

function pauseToggle(openings, closings) {
return (observable) =>
new Observable((subscriber) => {
const buffers = new Set();
let closingSubscription = false;
const subscription = observable.subscribe(
(value) => {
closingSubscription ? buffers.add(value) : subscriber.next(value);
},
noop,
() => {
buffers.forEach((item) => subscriber.next(item));
subscriber.complete();
},
);

const openingSubscription = openings.subscribe(() => {
const emitBuffer = () => {
buffers.forEach((item) => subscriber.next(item));
buffers.clear();
closingSubscription.unsubscribe();
closingSubscription = false;
};
closingSubscription = closings.subscribe(emitBuffer);
});
return () => {
buffers.clear();
subscription.unsubscribe();
openingSubscription.unsubscribe();
if (closingSubscription) closingSubscription.unsubscribe();
};
});
}
const source$ = interval(100).pipe(take(14), pauseToggle(interval(500), interval(200)));
function a() {}
const source$ = fromEventPattern(
(handle) => {
a(handle);
},
() => {},
);

const startTime = new Date().getTime();
const compare = () => Math.ceil((new Date().getTime() - startTime) / 100);
Expand Down

0 comments on commit 8ea3697

Please sign in to comment.