Skip to content

Commit

Permalink
feat(buffercountin): update Combine Plugin 中的缓存逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
KonghaYao committed Aug 8, 2021
1 parent cdc92ac commit faaf518
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 3 deletions.
7 changes: 4 additions & 3 deletions package/plugins/Combine.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@

import { Plugin } from '@src/Pipeline/PluginSystem.js';
import { of } from 'rxjs';
import { bufferTime, concatMap, filter } from 'rxjs/operators';
import { concatMap, filter } from 'rxjs/operators';
import { TaskGroup } from '../../src/TaskSystem/TaskGroup';
export function Combine(number, waitTime = 5000, options = {}) {
import { BufferCountIn } from './utils/bufferCountIn';
export function Combine(number, waitTime = 1000, options = {}) {
return Plugin({
name: 'Combine', // 这个 name 是负责进行监控的标志符号
options, // 接收所有的参数,提供给所有函数使用
operator() {
// 复写 operator
return (source) =>
source.pipe(
bufferTime(waitTime, undefined, number),
BufferCountIn(number, waitTime),
filter((i) => i.length), // 必须要进行检测是否为空
concatMap((tasks) => of(new TaskGroup(tasks))),
);
Expand Down
56 changes: 56 additions & 0 deletions package/plugins/utils/bufferCountIn.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* @license
* Copyright 2021 KonghaYao 江夏尧 <dongzhongzhidong@qq.com>
* SPDX-License-Identifier: Apache-2.0
*/
import { noop, Observable } from 'rxjs';

// 收集 3 个数据,但是前后之间间隔不超过 1000 ms
// 超过 1000 ms 时,放弃收集,直接返回 buffer 的数据
export function BufferCountIn(countNumber = 3, maxWaitTime = 1000) {
return (observable) =>
new Observable((subscriber) => {
const buffers = new Set();
let TimeOut = 0; // 0 表示没有计时器

// 发送所有的数据,并将定时器删除
const sendAll = () => {
if (buffers.size) buffers.forEach((item) => subscriber.next(item));
};
// 删除定时器标记, 没有时忽略
const deleteTimeout = () => {
if (TimeOut) {
clearTimeout(TimeOut);
TimeOut = 0;
}
};
const subscription = observable.subscribe(
(value) => {
// buffer.size 为 0 时,不设置定时,但是收集;
// 1 时,收集并设置定时器;
// 2 时,发送并重置定时器
deleteTimeout();
switch (buffers.size) {
case 0:
return buffers.add(value);
case countNumber - 2:
setTimeout(sendAll, maxWaitTime);
return buffers.add(value);
default:
sendAll();
return subscriber.next(value);
}
},
noop,
() => {
sendAll();
buffers.clear();
subscriber.complete();
},
);
return () => {
buffers.clear();
subscription.unsubscribe();
};
});
}
1 change: 1 addition & 0 deletions src/utils/pauseToggle.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function pauseToggle(openings, closings) {
noop,
() => {
buffers.forEach((item) => subscriber.next(item));
buffers.clear();
subscriber.complete();
},
);
Expand Down

0 comments on commit faaf518

Please sign in to comment.