javascript - 如何使用 rxjs 有条件地缓冲?

标签 javascript rxjs

我试图解决的基本问题是我有一个节点服务器,它会发送日志错误并在抛出错误时向我们的团队发送错误电子邮件。如果出现令人难以置信的错误,我不想用数千封电子邮件向我们的电子邮件收件箱发送垃圾邮件。

因此,第一次抛出特定错误时,请发送电子邮件。之后,如果在 5 分钟的时间段内抛出相同的错误,我想收集这些错误并每 5 分钟将它们发送到一个数组中。如果可能,我想使用 rxjs。

我已经设置了一个几乎可行的简单场景。您可以通过 groupBy 查看我如何按类型聚合某些电子邮件。 bufferTime() 允许它在我指定的时间范围内收集电子邮件。

我遇到的问题是...我怎样才能先发送第一封电子邮件,然后在 5 分钟内收集其他所有邮件?

到目前为止,我已经尝试了 iif()、buffer() 和其他一些方法,但都没有成功。

        let subject = new Subject<number>();

        subject
            .pipe(
                groupBy(number => number % 2 === 0),
                mergeMap(group => group.pipe(
                    bufferTime(5000)
                )),
                filter(values => values.length > 0)
            )
            .subscribe(value => {
                console.log(value);
            });

        let index = 0;

        let handler = setInterval(() => {
            subject.next(index);
            index++;

            if(index >= 100) {
                clearInterval(handler);
                subject.complete();
            }
        }, 100);

这将输出...

[
   0,  2,  4,  6,  8, 10, 12, 14,
  16, 18, 20, 22, 24, 26, 28, 30,
  32, 34, 36, 38, 40, 42, 44
]
[
   1,  3,  5,  7,  9, 11, 13, 15,
  17, 19, 21, 23, 25, 27, 29, 31,
  33, 35, 37, 39, 41, 43, 45
]
[
  46, 48, 50, 52, 54, 56, 58,
  60, 62, 64, 66, 68, 70, 72,
  74, 76, 78, 80, 82, 84, 86,
  88
]
[
  47, 49, 51, 53, 55, 57, 59,
  61, 63, 65, 67, 69, 71, 73,
  75, 77, 79, 81, 83, 85, 87,
  89
]
[ 90, 92, 94, 96, 98 ]
[ 91, 93, 95, 97, 99 ]

我想输出的是这样的...

[ 0 ]
[ 1 ]
[
   2,  4,  6,  8, 10, 12, 14,
  16, 18, 20, 22, 24, 26, 28, 30,
  32, 34, 36, 38, 40, 42, 44
]
[
   3,  5,  7,  9, 11, 13, 15,
  17, 19, 21, 23, 25, 27, 29, 31,
  33, 35, 37, 39, 41, 43, 45
]
[
  46, 48, 50, 52, 54, 56, 58,
  60, 62, 64, 66, 68, 70, 72,
  74, 76, 78, 80, 82, 84, 86,
  88
]
[
  47, 49, 51, 53, 55, 57, 59,
  61, 63, 65, 67, 69, 71, 73,
  75, 77, 79, 81, 83, 85, 87,
  89
]
[ 90, 92, 94, 96, 98 ]
[ 91, 93, 95, 97, 99 ]

最佳答案

根据条件,需要立即打印每个组的第一个成员:

this.subject
  .pipe(
    groupBy(number => number % 2 === 0),
      mergeMap(group => {
        return merge(              
          group.pipe(first()),                
          group.pipe(bufferTime(5000))
        )       
      })
  ).subscribe(console.log)

关于javascript - 如何使用 rxjs 有条件地缓冲?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66179655/

相关文章:

javascript - 使用流实现去抖分块异步队列

javascript - setTimeout() 返回未捕获的类型错误

javascript - React 懒加载/无限滚动解决方案

angular - 比较 Observable 的前一个值和 Angular 中的下一个值

javascript - 结合最新。出现错误后继续观察

javascript - 是否可以从 Observable 中只删除一个 RXJS Observer?

javascript - 如何在函数 rxjs 中使用 observable 和管道?

javascript - Angular $http(...).success 不是函数有助于重写函数

javascript - 支持表格、图像和文件上传的富文本编辑器

javascript - 设置类方法的回调