我有一个数字流,它增加了一个我想要子采样的常数。给定一个常数样本interval
,我想缓冲流,直到第一个和最后一个缓冲值之间的差异大于或等于interval
.然后它会发出这个数组,类似于 buffer运算符(operator)。
我已经搜索了不同的 rxjs 运算符,但无法弄清楚如何使其工作。一个 bufferUntil
运营商将是完美的,但似乎不存在。我该如何实现?
例如:
const interval = 15;
//example stream would be: 5, 10 , 15, 20, 25, 30..
Observable.pipe(
bufferUntil(bufferedArray => {
let last = bufferedArray.length - 1;
return (bufferedArray[last] - bufferedArray[0] >= interval);
})
).subscribe(x => console.log(x));
//With an expected output of [5, 10, 15, 20], [ 25, 30, 35, 40],..
最佳答案
您可以实现一个维护自定义缓冲区的运算符。将所有传入值插入缓冲区,当满足给定条件时发出缓冲区并重置它。使用defer
为每个订阅者提供自己的缓冲区。
function bufferUntil<T>(emitWhen: (currentBuffer: T[]) => boolean): OperatorFunction<T, T[]> {
return (source: Observable<T>) => defer(() => {
let buffer: T[] = []; // custom buffer
return source.pipe(
tap(v => buffer.push(v)), // add values to buffer
switchMap(() => emitWhen(buffer) ? of(buffer) : EMPTY), // emit the buffer when the condition is met
tap(() => buffer = []) // clear the buffer
)
});
}
https://stackblitz.com/edit/rxjs-7awqmv
关于javascript - rxjs - 缓冲流直到函数返回 true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59289937/