rxjs - 递归可观察

标签 rxjs reactivex

我正在使用 RxJs,我必须制定一个轮询机制来从服务器检索更新。

我需要每秒发出一个请求,解析更新,发出它并记住它的 ID,因为我需要它来请求下一个更新包,例如 getUpdate(lastId + 1)

第一部分很简单,所以我只需将 intervalmergeMap 结合使用

let lastId = 0
const updates = Rx.Observable.interval(1000)
    .map(() => lastId)
    .mergeMap((offset) => getUpdates(offset + 1))

我正在收集这样的标识符:

updates.pluck('update_id').scan(Math.max, 0).subscribe(val => lastId = val)

但是这个解决方案不是纯粹的响应式(Reactive),我正在寻找省略“全局”变量的使用的方法。

如何改进代码,同时仍然能够返回仅包含调用者更新的可观察值?

UPD。

getUpdates(id) 的服务器响应如下所示:

[
  { update_id: 1, payload: { ... } },
  { update_id: 3, payload: { ... } },
  { update_id: 2, payload: { ... } }
]

它可能包含任何顺序的 0 到无穷大更新

最佳答案

有这样的事吗?请注意,这是一个无限流,因为没有条件中止;你没有给。

// Just returns the ID as the update_id.
const fakeResponse = id => {
  return [{ update_id: id }];
};

// Fakes the actual HTTP call with a network delay.
const getUpdates = id => Rx.Observable.of(null).delay(250).map(() => fakeResponse(id));

// Start with update_id = 0, then recursively call with the last
// returned ID incremented by 1.
// The actual emissions on this stream will be the full server responses.
const updates$ = getUpdates(0)
  .expand(response => Rx.Observable.of(null)
    .delay(1000)
    .switchMap(() => {
      const highestId = Math.max(...response.map(update => update.update_id));
      return getUpdates(highestId + 1);
    })
  )

updates$.take(5).subscribe(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>

要定义流的终止,您可能需要在末尾 Hook 到 switchMap ;使用 response 的任何属性有条件地返回 Observable.empty(),而不是再次调用 getUpdates

关于rxjs - 递归可观察,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47968955/

相关文章:

javascript - 有没有办法将 rxjs 与 ng2-charts 一起使用?

javascript - 如何在 rxjs 中为每个订阅在可观察管道上执行一次初始化逻辑

dart - Flutter:将Observable1.mergeWith([observable2])喂入后,Stream Builder无法正常工作

java - 检查 RxJava 中是否有订阅者抛出异常

javascript - 如何使用 Angular 处理 RxJS 中的错误

javascript - 为什么 `BehaviorSubject` 没有发出最后一个值

javascript - 使 Meteor 方法调用在客户端同步

java - RxAndroid : Observe an EditText while also observing touches on 2 Views

ios - Rx swift : Prevent multiple network requests

angular - 如果我们不订阅以 Angular 返回可观察对象的 HttpClient 请求,会发生什么