rxjs - 如果项目数少于缓冲区计数,则如何在发出时获得与 bufferCount 类似的行为

标签 rxjs

我正在尝试实现与缓冲区计数非常相似的东西。当值通过管道时,bufferCount 当然会缓冲它们并批量发送它们。我想要与此类似的东西,如果当前流中的缓冲区大小少于缓冲区大小,它将发出所有剩余的项目。

这有点令人困惑,所以我将提供一个示例来说明我想要实现的目标。

我有一些东西可以将项目单独添加到主题中。有时它会每分钟添加 1 个项目,有时会在 1 秒内添加 1000 个项目。我希望对这些项目批量执行长时间运行的过程(2 秒~),以免服务器过载。

例如,考虑 P 正在处理的时间线

---A-----------B----------C---D--EFGHI------------------
   |_( P(A) )  |_(P(B))   |_(  P(C)  ) |_(P([D, E, F, G, H, I]))

通过这种方式,我可以根据要处理的事件数量来小批量或大批量处理事件,但我确保批量保持小于 X。

我基本上需要将所有单独的发射映射到包含 5 个或更少 block 的发射。当我将事件通过管道传输到 concatMap 时,事件将开始堆积。我想批量挑选这些堆积的事件。我怎样才能实现这个目标?

这是我迄今为止所得到的堆栈 Blitz :https://stackblitz.com/edit/rxjs-iqwcbh?file=index.ts

请注意,在更多项进入并填满缓冲区之前,第 4 项和第 5 项不会被处理。理想情况下,在处理完 1,2,3 后,它将挑选队列中的 4,5。然后当 6,7,8 进来时,它会处理这些。

最佳答案

编辑:今天我了解到 bufferTime 有一个 maxBufferSize 参数,当缓冲区达到该大小时将发出该参数。因此,下面的原始答案是不必要的,我们可以简单地这样做:

const stream$ = subject$.pipe(
  bufferTime(2000, null, 3), // <-- buffer emits @ 2000ms OR when 3 items collected
  filter(arr => !!arr.length)
);

StackBlitz


原文:

听起来您想要 bufferCount 的组合和 bufferTime 。换句话说:“当缓冲区达到大小 X 或经过 Y 时间后释放缓冲区”。

我们可以使用race运算符,与其他两个一起创建一个可观察对象,该可观察对象在缓冲区达到所需大小或持续时间过去后发出。我们还需要 take 的帮助。和 repeat :

const chunk$ = subject$.pipe(bufferCount(3));

const partial$ = subject$.pipe(
  bufferTime(2000),
  filter(arr => !!arr.length) // don't emit empty array
);

const stream$ = race([chunk$, partial$]).pipe(
  take(1),
  repeat()
);

这里我们将 stream$ 定义为 chunk$partial$ 之间第一个发出的。但是,race 只会使用第一个发出的源,因此我们使用 take(1)repeat 来对“重置”进行排序比赛”。

然后您可以使用 concatMap 进行工作,如下所示:

stream$.pipe(
  concatMap(chunk => this.doWorkWithChunk(chunk))
);

这是一个工作 StackBlitz演示。


您可能想将其滚动到自定义运算符中,因此您可以简单地执行如下操作:

const stream$ = subject$.pipe(
  bufferCountTime(5, 2000)
);

bufferCountTime() 的定义可能如下所示:

function bufferCountTime<T>(count: number, time: number) {
  
  return (source$: Observable<T>) => {
    const chunk$ = source$.pipe(bufferCount(count));
    const partial$ = source$.pipe(
      bufferTime(time),
      filter((arr: T[]) => !!arr.length)
    );

    return race([chunk$, partial$]).pipe(
      take(1),
      repeat()
    );
  }
}

另一个StackBlitz样本。

关于rxjs - 如果项目数少于缓冲区计数,则如何在发出时获得与 bufferCount 类似的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69122873/

相关文章:

angularjs - 使用 Angular2 和 TypeScript 将 http 功能移动到它自己的服务中

javascript - 在将少量记录插入 Postgres 数据库后,Nodejs 应用程序使用 knex 中断

angular - RxJS:将多个 http 请求组合成单个、扁平的可观察数组

javascript - 如何根据第一个流对两个流进行分组和组合?

javascript - 从 rxjs 可以观察到

Angular 主题订阅不起作用

angular - 捕获错误并返回 of(undefined) 会停止流(Angular 7,rxjs6)

javascript - 从其他可观察对象切换 Rxjs 流

javascript - 为什么可观察性重新订阅热门

angular - 如何从 Observable 内部返回 Observable