Skip to content

Commit

Permalink
feat(combine): 完成 combine 设计
Browse files Browse the repository at this point in the history
  • Loading branch information
KonghaYao committed Aug 8, 2021
1 parent faaf518 commit 6b90ee7
Show file tree
Hide file tree
Showing 8 changed files with 716 additions and 36,145 deletions.
50 changes: 27 additions & 23 deletions dist/JSpider.cjs.min.js

Large diffs are not rendered by default.

18,275 changes: 275 additions & 18,000 deletions dist/JSpider.esm.min.js

Large diffs are not rendered by default.

18,379 changes: 324 additions & 18,055 deletions dist/JSpider.min.js

Large diffs are not rendered by default.

72 changes: 38 additions & 34 deletions dist/JSpider.umd.min.js

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions package/plugins/Combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import { of } from 'rxjs';
import { concatMap, filter } from 'rxjs/operators';
import { TaskGroup } from '../../src/TaskSystem/TaskGroup';
import { BufferCountIn } from './utils/bufferCountIn';
export function Combine(number, waitTime = 1000, options = {}) {
export function Combine(number, waitTime = 1000, combineFunction = undefined) {
return Plugin({
name: 'Combine', // 这个 name 是负责进行监控的标志符号
options, // 接收所有的参数,提供给所有函数使用
main: combineFunction,
operator() {
// 复写 operator
return (source) =>
source.pipe(
BufferCountIn(number, waitTime),
filter((i) => i.length), // 必须要进行检测是否为空
concatMap((tasks) => of(new TaskGroup(tasks))),
concatMap((tasks) => {
const cb = of(new TaskGroup(tasks));
if (combineFunction instanceof Function) {
return cb.pipe(concatMap((task) => this.TaskStarter(task)));
}
return cb;
}),
);
},
});
Expand Down
23 changes: 15 additions & 8 deletions package/plugins/utils/bufferCountIn.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ export function BufferCountIn(countNumber = 3, maxWaitTime = 1000) {

// 发送所有的数据,并将定时器删除
const sendAll = () => {
if (buffers.size) buffers.forEach((item) => subscriber.next(item));
if (buffers.size) {
subscriber.next([...buffers]);
buffers.clear();
}
};
// 删除定时器标记, 没有时忽略
const deleteTimeout = () => {
Expand All @@ -24,27 +27,31 @@ export function BufferCountIn(countNumber = 3, maxWaitTime = 1000) {
TimeOut = 0;
}
};
const UpdateTimeout = () => {
TimeOut = setTimeout(sendAll, maxWaitTime);
};
const subscription = observable.subscribe(
(value) => {
console.log(TimeOut);
// buffer.size 为 0 时,不设置定时,但是收集;
// 1 时,收集并设置定时器;

// 2 时,发送并重置定时器
// 其他 时,收集并设置定时器;
deleteTimeout();
switch (buffers.size) {
case 0:
return buffers.add(value);
case countNumber - 2:
setTimeout(sendAll, maxWaitTime);
return buffers.add(value);
case countNumber - 1:
buffers.add(value);
return sendAll();
default:
sendAll();
return subscriber.next(value);
UpdateTimeout();
return buffers.add(value);
}
},
noop,
() => {
sendAll();
buffers.clear();
subscriber.complete();
},
);
Expand Down
42 changes: 24 additions & 18 deletions test/Request.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,32 @@ export async function main() {
spider.pipeline(
Request({ delay: 100, buffer: 2, retry: 3, handleError: null }),

Combine(3, 1000),
ExcelHelper(
(data) => {
return {
[new Date().getTime()]: data
.map((i) => i.data)
.flat()
.flat()
.flat(),
};
},
{
XLSXOptions: {
bookType: 'csv', // 可以指定为 csv 或者 xlsx
},
},
),
Combine(3, 1000, (array) => {
return array.map((i) => i.data).flat();
}),
// Combine(3, 1000), 如果没有处理函数,则下一个 plugin 将会收到一个数组
Plugin((data) => {
console.log('this', data);
}),
// ExcelHelper(
// (data) => {
// return {
// [new Date().getTime()]: data
// .map((i) => i.data)
// .flat()
// .flat()
// .flat(),
// };
// },
// {
// XLSXOptions: {
// bookType: 'csv', // 可以指定为 csv 或者 xlsx
// },
// },
// ),
// ZipFile({ chunk: 2 }),

Download(),
// Download(),
);
spider.crawl(urls);
spider.start();
Expand Down
8 changes: 4 additions & 4 deletions test/text-bilibili.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import('https://cdn.jsdelivr.net/npm/js-spider@3.2.2/dist/JSpider.esm.min.js').then(async (res) => {
import('https://cdn.jsdelivr.net/npm/js-spider@3.2.3/dist/JSpider.esm.min.js').then(async (res) => {
let JSpider = res.default;
let {
Plugin,
Expand Down Expand Up @@ -33,9 +33,9 @@ import('https://cdn.jsdelivr.net/npm/js-spider@3.2.2/dist/JSpider.esm.min.js').t
Request({
buffer: 1,
}),
Combine(50, 5000),
Plugin((data) => {
return data
Combine(50, 1000, (dataArray) => {
return dataArray
.flat()
.map((i) => {
i.data.result.forEach((item) => {
['hit_columns', 'new_rec_tags'].forEach((ii) => (item[ii] = ''));
Expand Down

0 comments on commit 6b90ee7

Please sign in to comment.