javascript - RxJS promise 链

标签 javascript promise reactive-programming rxjs

我对 RxJS 还很陌生。所以我有一个流,为不同的 ajax 调用创建有效负载,然后我使用 flatMap 来检索我需要的数据,它工作正常。简单。

const streamA = Rx.Observable.from(array);

const streamB = streamA
  .map( val => /* build payload */ )
  .flatMap( payload => Rx.Observable.fromPromise($.ajax(payload))

streamB.subscribe( result => /* got it */)

现在,我将为每个项目创建一个有效负载数组,但问题是,现在当我订阅流时,我会返回每个请求,但完成后我只会返回初始元素。

const streamC = streamA
  .flatMap( payloads => {
    return Rx.Observable.from(payloads)
      .flatMap( payload => Rx.Observable.fromPromise($.ajax(payload))

streamC.subscribe( result => /* executed for every payload */)

我尝试使用返回正确分组数组的groupBy,并向我展示了我可以链接Observable,但我仍然不知道如何正确订阅观察者来满足元素。

const streamWLF = streamA
  .flatMap( payloads => {
    return Rx.Observable.from(payloads)
      .flatMap( payload => Rx.Observable.fromPromise($.ajax(payload))
      .groupBy((obs) => obs.key, (obs) => obs)

streamWLF.subscribe( result => {
  result.subscribe(/* did my magic here*/);
})

所以我的问题是,哪种方法最好?

当收到子流项时,订阅的主流是否总是触发?

如果可以的话,如何订阅subStream才能在subStream完成时才触发订阅的mainStream?

最佳答案

我认为您正在寻找 forkJoin运算符。

const streamC = streamA
  .flatMap(payloads => Rx.Observable.forkJoin(payloads));

streamC.subscribe( results => /*An array containing the results from all payloads*/);

关于javascript - RxJS promise 链,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35291307/

相关文章:

javascript - 我无法在 Angular 中显示新对象

javascript - 无法测试已解决 promise 的效果

javascript - ReactNative native-modules Promise 传递字段和 Swift

javascript - JS 中 Promise 和回调的问题

ios - 如何使用 RxSwift 的 Groupby 运算符将 GroupedObservable<String, Message> 转换为 SectionModel<String, Message>?

javascript - JS 中的 react 流如何工作?

javascript - Jquery 在 id 容器中隐藏 3 个 div 中的 2 个

javascript - 防止 Jade 展平文件夹结构

javascript - 如何优雅地停止html5视频播放

java - Spring WebFlux,单元测试Mono和Flux