javascript - 带反压的 Rxjs 缓冲区实现

标签 javascript reactive-programming rxjs

RxJS新手问题来了! 所以我有这个基本缓冲区,它将 source1 和 source2 中的所有内容附加到一个数组中。在某些条件下,缓冲区会被清除。

var buffer = Rx.Observable.merge(source1, source2).scan(
  function (arr, item) { 
    if (!magic) { 
      return arr.push(item); 
    }
    else { 
      return [item]; //Clear the buffer from previous items
    } 
  }, []);

我还希望有一个缓冲区的“消费者”,它可以从缓冲区中转移项目并使用它们进行操作。我如何实现它并确保消费者更新可观察的缓冲区?

编辑:我想将数据输入 SourceBuffer但只允许在不更新时向其追加数据。我猜这给了我一个背压的情况。所以我确实尝试创建一个 controlled observable但无法弄清楚如何使用我自己的缓冲创建我自己的版本。

最佳答案

So I have this basic buffer which appends everything from source1 and source2 to an array. Under certain conditions the buffer is cleared.

您需要在这里做的是:

var sourceStream = Rx.Observable.merge(soruce1, source2);

var boundary = sourceStream.lift(someOperator) // for example sourceStream.skip(3);
// someOperator is where you perform the "magic"

var subscribeToThisStream = sourceStream.buffer( boundary ); 
// emits all items collected in the buffer between two boundary emitions

I also want to have a "consumer" of the buffer, which shifts items from the buffer and does things with them. How do I implement that and make sure the consumer updates the buffer observable?

如果您希望通过订阅者来做到这一点,如果您想以 Rx 方式做事,这绝对是不可取的,在某些情况下甚至可能是不可能的。

关于javascript - 带反压的 Rxjs 缓冲区实现,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32400174/

相关文章:

typescript - 如何在列表中正确使用 Observable?

javascript - qml 函数的默认参数给出语法错误

reactive-programming - Smallrye Mutiny 中的 switchIfEmpty 替代方案是什么

javascript - 在 rxjs Observable 中抛出错误

java - 如何等待异步 Observable 在另一个 Observable 上完成

javascript - 等待所有 Observables 完成

javascript - 有没有办法使用 lightbox2 实现图像之间更平滑的过渡?

javascript - React - 在从另一个类调用的函数中调用 setState 不会抛出函数异常

javascript - sails.js 中的某些 URL 可以免除 CSRF 吗?

angular - rxjs可观察的重用逻辑