javascript - ReactiveX:Group and Buffer only last item in each group

标签 javascript rxjs reactivex

如何对一个 Observable 进行分组,并从每个 GroupedObservable 中仅在内存中保留最后发出的项目? 这样每个组的行为就像 BehaviorSubject 一样。

像这样:

{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}

所以在内存中我们只有每个用户的最后一项:

{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}

当订阅者订阅时,这两个项目会立即发布(每个都有自己的发射)。就像我们为每个 user 设置了 BehaviorSubject。

永远不会触发 onCompleted(),因为人们可能永远聊天。

我事先不知道 user 值是什么。

最佳答案

我假设您的聊天记录 observable 很热。因此,#groupBy 发出的 groupObservables 也会很热,并且不会自行在内存中保留任何内容。

要获得您想要的行为(丢弃订阅前的最后一个值以外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。

如有错误请指正

参见 jsbin

var groups = chatlog
      .groupBy(message => message.user)
      .map(groupObservable => {
        var subject = new Rx.ReplaySubject(1);
        groupObservable.subscribe(value => subject.onNext(value));
        return subject;
      });

关于javascript - ReactiveX:Group and Buffer only last item in each group,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34145876/

相关文章:

javascript - 编写 JavaScript 小部件的最佳实践

javascript - Onclick 没有被注册,一直告诉我它是未定义的

javascript - 在 JavaScript 中合并对象

angular4 ObjectUnsubscribedError

websocket - 如何防止websocket在angular5中的特定时间后关闭?

python - 如何使用可观察的 RxPY 间隔定期调用异步协程?

javascript - javascript中 "click"事件的嵌套函数调用

使用@ngneat/until-destroy npm 包取消订阅 Angular 9 observable

rx-swift - RxSwift map 中的多个可观察值

java - 如何从 InputStream 创建 Vert.x ReadStream