RxJS新手问题来了! 所以我有这个基本缓冲区,它将 source1 和 source2 中的所有内容附加到一个数组中。在某些条件下,缓冲区会被清除。
var buffer = Rx.Observable.merge(source1, source2).scan(
function (arr, item) {
if (!magic) {
return arr.push(item);
}
else {
return [item]; //Clear the buffer from previous items
}
}, []);
我还希望有一个缓冲区的“消费者”,它可以从缓冲区中转移项目并使用它们进行操作。我如何实现它并确保消费者更新可观察的缓冲区?
编辑:我想将数据输入 SourceBuffer但只允许在不更新时向其追加数据。我猜这给了我一个背压的情况。所以我确实尝试创建一个 controlled observable但无法弄清楚如何使用我自己的缓冲创建我自己的版本。
最佳答案
So I have this basic buffer which appends everything from source1 and source2 to an array. Under certain conditions the buffer is cleared.
您需要在这里做的是:
var sourceStream = Rx.Observable.merge(soruce1, source2);
var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"
var subscribeToThisStream = sourceStream.buffer( boundary );
// emits all items collected in the buffer between two boundary emitions
I also want to have a "consumer" of the buffer, which shifts items from the buffer and does things with them. How do I implement that and make sure the consumer updates the buffer observable?
如果您希望通过订阅者来做到这一点,如果您想以 Rx 方式做事,这绝对是不可取的,在某些情况下甚至可能是不可能的。
关于javascript - 带反压的 Rxjs 缓冲区实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32400174/