javascript - RxJS:将历史数据与更新流相结合

标签 javascript rxjs

场景:

我正在通过一个简单的 ajax 调用加载一个初始数据数组,并将该数据放入 Observable,我将其称为历史。同时,我连接到一个 websocket 并定期接收数据,我们称之为更新,我想将此数据附加到历史

具体来说,假设 ajax 调用发回数组 [0,1,2] 并且套接字(随时间)发出 34, 5 然后我想像这样累积这些值:

[0,1,2]        // historical
[0,1,2,3]      // historical + updates1
[0,1,2,3,4]    // historical + updates1 + updates2
[0,1,2,3,4,5]  // etc

(请注意,这里有一个必须处理的并发边缘情况:historical 可能会产生 [0,1,2,3] 而第一个两个 updates34,在这种情况下我想要结束的仍然是 [0,1, 2,3,4]不是 [0,1,2,3,3,4]。)

最终目标是以单个 Observable stream 结束,它是 Observables historicalupdates 的组合,如所述。

到目前为止我尝试了什么:

仅累积 websocket 数据就足够容易了。我创建了 updates,这是 websocket 发出的 Observable 序列。每次观察到一个值时,我都可以使用 scan() 将其累积到数组中:

updates.scan((acc, update) => acc.concat([update]), [])

这会产生类似的结果

[3]
[3,4]
[3,4,5]

我的下一个问题是如何将它与历史 结合起来。由于 historical 的数据可能在已经观察到一个或多个 updates 之后到达,因此在我们等待 historical 时需要积累这些更新。我设法使用 withLatestFrom() 实现了这一点:

const stream = historical
  .withLatestFrom(
    updates.scan((acc, update) => acc.concat([update]), []),
    (history, buffer) => history.concat(buffer) /* could eliminate duplicates here */
  )

观察 stream 产生一个值,[0,1,2,3,4,5],它是 historical 的组合以及在历史之前到达的任何更新。正是我想要的。

但是,我不知道从那里去哪里。我怎样才能继续将 updates 附加到 stream 以便随着时间的推移,stream 产生如下内容:

[0,1,2,3,4,5]
[0,1,2,3,4,5,6]
[0,1,2,3,4,5,6,7]

我没有看到为此使用 scan 的方法,就像我为 updates 所做的那样,因为在这种情况下我需要 scan 的初始(种子)值是一个 Observable,而不是一个数组。

有没有一种方法可以做到这一点——可以通过添加我目前已有的方法,还是可以使用更好的替代方法来完成整个事情?

最佳答案

如果我没理解错,我会使用 skipUntil()运算符(operator)继续收集更新而不进一步发出它们。然后对于 withLatestFrom() 运算符,我会选择 updates Observable 作为它的来源。这将等待 skipUntil(),直到历史数据可用,然后在 updates 的每次发射时发出。

let updates = Observable
  .timer(0, 1000)
  .scan((acc, update) => {
    acc.push(update);
    return acc;
  }, []);

let historical = Observable.defer(() => { 
    console.log('Sending AJAX request ...');
    return Observable.of(['h1', 'h2', 'h3']); 
  })
  .delay(3000)
  .share();


const stream = updates.skipUntil(historical)
  .withLatestFrom(historical, (buffer, history) => {
    return history.concat(buffer);
  })
  .map(val => val) // remove duplicates;


stream.subscribe(val => console.log(val));

控制台输出如下:

Sending AJAX request ...
["h1", "h2", "h3", 0, 1, 2, 3]
["h1", "h2", "h3", 0, 1, 2, 3, 4]
["h1", "h2", "h3", 0, 1, 2, 3, 4, 5]

查看现场演示:https://jsbin.com/kumolez/11/edit?js,console

我不知道你的用例是什么,但我会尽量避免使用 concat() 因为当 buffer 增长时它可能会变慢。

此外,如果您在更新到达一个项目时发出更新(而不是累积它们),您可以使用 distinct() 运算符来过滤掉重复项。

顺便说一句,我假设您使用的是 RxJS 5。

关于javascript - RxJS:将历史数据与更新流相结合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42235610/

相关文章:

javascript - AJAX 部分加载后 Vue 绑定(bind)?

javascript - RxJS 根据外部条件从流中提取数据

javascript - 每个 Observable 的值发射运行一次操作

javascript - fullcalendar 不适用于 Mozilla 和 Internet Explorer

javascript - Angular 从服务返回 promise 对象。尝试访问值来更改 ng-if 指令

javascript - 如何阻止 Kendo UI MultiSelect 在点击时打开?

javascript - 如何使用 rxjs 在 angular2 中的输入 keyup 事件上实现去抖动服务

javascript - 获取 BehaviorSubject 的值

javascript - 如何在 redux-observable 中正确使用 forkJoin?

javascript - jQGrid - 更改列的顺序