我正在尝试实现与缓冲区计数非常相似的东西。当值通过管道时,bufferCount 当然会缓冲它们并批量发送它们。我想要与此类似的东西,如果当前流中的缓冲区大小少于缓冲区大小,它将发出所有剩余的项目。
这有点令人困惑,所以我将提供一个示例来说明我想要实现的目标。
我有一些东西可以将项目单独添加到主题中。有时它会每分钟添加 1
个项目,有时会在 1
秒内添加 1000
个项目。我希望对这些项目批量执行长时间运行的过程(2
秒~),以免服务器过载。
例如,考虑 P 正在处理的时间线
---A-----------B----------C---D--EFGHI------------------
|_( P(A) ) |_(P(B)) |_( P(C) ) |_(P([D, E, F, G, H, I]))
通过这种方式,我可以根据要处理的事件数量来小批量或大批量处理事件,但我确保批量保持小于 X。
我基本上需要将所有单独的发射映射到包含 5 个或更少 block 的发射。当我将事件通过管道传输到 concatMap 时,事件将开始堆积。我想批量挑选这些堆积的事件。我怎样才能实现这个目标?
这是我迄今为止所得到的堆栈 Blitz :https://stackblitz.com/edit/rxjs-iqwcbh?file=index.ts
请注意,在更多项进入并填满缓冲区之前,第 4 项和第 5 项不会被处理。理想情况下,在处理完 1,2,3 后,它将挑选队列中的 4,5。然后当 6,7,8 进来时,它会处理这些。
最佳答案
编辑:今天我了解到 bufferTime
有一个 maxBufferSize
参数,当缓冲区达到该大小时将发出该参数。因此,下面的原始答案是不必要的,我们可以简单地这样做:
const stream$ = subject$.pipe(
bufferTime(2000, null, 3), // <-- buffer emits @ 2000ms OR when 3 items collected
filter(arr => !!arr.length)
);
原文:
听起来您想要 bufferCount
的组合和 bufferTime
。换句话说:“当缓冲区达到大小 X 或经过 Y 时间后释放缓冲区”。
我们可以使用race
运算符,与其他两个一起创建一个可观察对象,该可观察对象在缓冲区达到所需大小或持续时间过去后发出。我们还需要 take
的帮助。和 repeat
:
const chunk$ = subject$.pipe(bufferCount(3));
const partial$ = subject$.pipe(
bufferTime(2000),
filter(arr => !!arr.length) // don't emit empty array
);
const stream$ = race([chunk$, partial$]).pipe(
take(1),
repeat()
);
这里我们将 stream$
定义为 chunk$
和 partial$
之间第一个发出的。但是,race
只会使用第一个发出的源,因此我们使用 take(1)
和 repeat
来对“重置”进行排序比赛”。
然后您可以使用 concatMap
进行工作,如下所示:
stream$.pipe(
concatMap(chunk => this.doWorkWithChunk(chunk))
);
这是一个工作 StackBlitz演示。
您可能想将其滚动到自定义运算符中,因此您可以简单地执行如下操作:
const stream$ = subject$.pipe(
bufferCountTime(5, 2000)
);
bufferCountTime()
的定义可能如下所示:
function bufferCountTime<T>(count: number, time: number) {
return (source$: Observable<T>) => {
const chunk$ = source$.pipe(bufferCount(count));
const partial$ = source$.pipe(
bufferTime(time),
filter((arr: T[]) => !!arr.length)
);
return race([chunk$, partial$]).pipe(
take(1),
repeat()
);
}
}
另一个StackBlitz样本。
关于rxjs - 如果项目数少于缓冲区计数,则如何在发出时获得与 bufferCount 类似的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69122873/