如何对一个 Observable 进行分组,并从每个 GroupedObservable 中仅在内存中保留最后发出的项目? 这样每个组的行为就像 BehaviorSubject 一样。
像这样:
{user: 1, msg: "Anyone here?"}
{user: 2, msg: "Hi"}
{user: 2, msg: "How are you?"}
{user: 1, msg: "Hello"}
{user: 1, msg: "Good"}
所以在内存中我们只有每个用户
的最后一项:
{user: 2, msg: "How are you?"}
{user: 1, msg: "Good"}
当订阅者订阅时,这两个项目会立即发布(每个都有自己的发射)。就像我们为每个 user
设置了 BehaviorSubject。
永远不会触发 onCompleted(),因为人们可能永远聊天。
我事先不知道 user
值是什么。
最佳答案
我假设您的聊天记录 observable 很热。因此,#groupBy 发出的 groupObservables 也会很热,并且不会自行在内存中保留任何内容。
要获得您想要的行为(丢弃订阅前的最后一个值以外的所有内容并从那里继续),您可以使用 ReplaySubject(1)。
如有错误请指正
参见 jsbin
var groups = chatlog
.groupBy(message => message.user)
.map(groupObservable => {
var subject = new Rx.ReplaySubject(1);
groupObservable.subscribe(value => subject.onNext(value));
return subject;
});
关于javascript - ReactiveX:Group and Buffer only last item in each group,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34145876/