场景:
我正在通过一个简单的 ajax 调用加载一个初始数据数组,并将该数据放入 Observable,我将其称为历史。同时,我连接到一个 websocket 并定期接收数据,我们称之为更新,我想将此数据附加到历史。
具体来说,假设 ajax 调用发回数组 [0,1,2]
并且套接字(随时间)发出 3
、4
, 5
然后我想像这样累积这些值:
[0,1,2] // historical
[0,1,2,3] // historical + updates1
[0,1,2,3,4] // historical + updates1 + updates2
[0,1,2,3,4,5] // etc
(请注意,这里有一个必须处理的并发边缘情况:historical 可能会产生 [0,1,2,3]
而第一个两个 updates 是 3
和 4
,在这种情况下我想要结束的仍然是 [0,1, 2,3,4]
— 不是 [0,1,2,3,3,4]
。)
最终目标是以单个 Observable stream 结束,它是 Observables historical 和 updates 的组合,如所述。
到目前为止我尝试了什么:
仅累积 websocket 数据就足够容易了。我创建了 updates,这是 websocket 发出的 Observable 序列。每次观察到一个值时,我都可以使用 scan()
将其累积到数组中:
updates.scan((acc, update) => acc.concat([update]), [])
这会产生类似的结果
[3]
[3,4]
[3,4,5]
我的下一个问题是如何将它与历史 结合起来。由于 historical 的数据可能在已经观察到一个或多个 updates 之后到达,因此在我们等待 historical 时需要积累这些更新。我设法使用 withLatestFrom()
实现了这一点:
const stream = historical
.withLatestFrom(
updates.scan((acc, update) => acc.concat([update]), []),
(history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
)
观察 stream 产生一个值,[0,1,2,3,4,5]
,它是 historical 的组合以及在历史之前到达的任何更新。正是我想要的。
但是,我不知道从那里去哪里。我怎样才能继续将 updates 附加到 stream 以便随着时间的推移,stream 产生如下内容:
[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]
我没有看到为此使用 scan
的方法,就像我为 updates 所做的那样,因为在这种情况下我需要 scan
的初始(种子)值是一个 Observable,而不是一个数组。
有没有一种方法可以做到这一点——可以通过添加我目前已有的方法,还是可以使用更好的替代方法来完成整个事情?
最佳答案
如果我没理解错,我会使用 skipUntil()
运算符(operator)继续收集更新而不进一步发出它们。然后对于 withLatestFrom()
运算符,我会选择 updates
Observable 作为它的来源。这将等待 skipUntil()
,直到历史数据可用,然后在 updates
的每次发射时发出。
let updates = Observable
.timer(0, 1000)
.scan((acc, update) => {
acc.push(update);
return acc;
}, []);
let historical = Observable.defer(() => {
console.log('Sending AJAX request ...');
return Observable.of(['h1', 'h2', 'h3']);
})
.delay(3000)
.share();
const stream = updates.skipUntil(historical)
.withLatestFrom(historical, (buffer, history) => {
return history.concat(buffer);
})
.map(val => val) // remove duplicates;
stream.subscribe(val => console.log(val));
控制台输出如下:
Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]
查看现场演示:https://jsbin.com/kumolez/11/edit?js,console
我不知道你的用例是什么,但我会尽量避免使用 concat()
因为当 buffer
增长时它可能会变慢。
此外,如果您在更新到达一个项目时发出更新(而不是累积它们),您可以使用 distinct()
运算符来过滤掉重复项。
顺便说一句,我假设您使用的是 RxJS 5。
关于javascript - RxJS:将历史数据与更新流相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42235610/