diff --git a/package/plugins/Combine.js b/package/plugins/Combine.js index 68af911..877ffe3 100644 --- a/package/plugins/Combine.js +++ b/package/plugins/Combine.js @@ -7,9 +7,10 @@ import { Plugin } from '@src/Pipeline/PluginSystem.js'; import { of } from 'rxjs'; -import { bufferTime, concatMap, filter } from 'rxjs/operators'; +import { concatMap, filter } from 'rxjs/operators'; import { TaskGroup } from '../../src/TaskSystem/TaskGroup'; -export function Combine(number, waitTime = 5000, options = {}) { +import { BufferCountIn } from './utils/bufferCountIn'; +export function Combine(number, waitTime = 1000, options = {}) { return Plugin({ name: 'Combine', // 这个 name 是负责进行监控的标志符号 options, // 接收所有的参数,提供给所有函数使用 @@ -17,7 +18,7 @@ export function Combine(number, waitTime = 5000, options = {}) { // 复写 operator return (source) => source.pipe( - bufferTime(waitTime, undefined, number), + BufferCountIn(number, waitTime), filter((i) => i.length), // 必须要进行检测是否为空 concatMap((tasks) => of(new TaskGroup(tasks))), ); diff --git a/package/plugins/utils/bufferCountIn.js b/package/plugins/utils/bufferCountIn.js new file mode 100644 index 0000000..6276850 --- /dev/null +++ b/package/plugins/utils/bufferCountIn.js @@ -0,0 +1,56 @@ +/** + * @license + * Copyright 2021 KonghaYao 江夏尧 + * SPDX-License-Identifier: Apache-2.0 + */ +import { noop, Observable } from 'rxjs'; + +// 收集 3 个数据,但是前后之间间隔不超过 1000 ms +// 超过 1000 ms 时,放弃收集,直接返回 buffer 的数据 +export function BufferCountIn(countNumber = 3, maxWaitTime = 1000) { + return (observable) => + new Observable((subscriber) => { + const buffers = new Set(); + let TimeOut = 0; // 0 表示没有计时器 + + // 发送所有的数据,并将定时器删除 + const sendAll = () => { + if (buffers.size) buffers.forEach((item) => subscriber.next(item)); + }; + // 删除定时器标记, 没有时忽略 + const deleteTimeout = () => { + if (TimeOut) { + clearTimeout(TimeOut); + TimeOut = 0; + } + }; + const subscription = observable.subscribe( + (value) => { + // buffer.size 为 0 时,不设置定时,但是收集; + // 1 时,收集并设置定时器; + // 2 时,发送并重置定时器 + deleteTimeout(); + switch (buffers.size) { + case 0: + return buffers.add(value); + case countNumber - 2: + setTimeout(sendAll, maxWaitTime); + return buffers.add(value); + default: + sendAll(); + return subscriber.next(value); + } + }, + noop, + () => { + sendAll(); + buffers.clear(); + subscriber.complete(); + }, + ); + return () => { + buffers.clear(); + subscription.unsubscribe(); + }; + }); +} diff --git a/src/utils/pauseToggle.js b/src/utils/pauseToggle.js index cdcfad6..b743be1 100644 --- a/src/utils/pauseToggle.js +++ b/src/utils/pauseToggle.js @@ -26,6 +26,7 @@ export function pauseToggle(openings, closings) { noop, () => { buffers.forEach((item) => subscriber.next(item)); + buffers.clear(); subscriber.complete(); }, );