RXJS 等待所有 Observables 完成并返回结果

标签 rxjs ngrx

我正在尝试创建一个 RX 流,它将执行异步的 XHR 调用列表,然后等待它们完成,然后再进行下一次调用。

为了帮助解释这可以在普通 JS 中这样写:

try {
    await* [
        ...requests.map(r => angularHttpService.get(`/foo/bar/${r}`))
    ];
} catch(e) { throw e }

// do something

这是我正在尝试的代码,但它单独运行它们,而不是在继续之前等待它们全部完成。 (这是一个 NGRX 效果流,因此它与 vanilla rx 略有不同)。

mergeMap(
        () => this.requests, concatMap((resqests) => from(resqests))),
        (request) =>
            this.myAngularHttpService
                .get(`foo/bar/${request}`)
                .pipe(catchError(e => of(new HttpError(e))))
    ),
    switchMap(res => new DeleteSuccess())

最佳答案

您可以使用 forkJoin ,它将从每个已完成的 observable 中发出最后发出的值。以下是链接文档中的示例:

    import { mergeMap } from 'rxjs/operators';
    import { forkJoin } from 'rxjs/observable/forkJoin';
    import { of } from 'rxjs/observable/of';

    const myPromise = val =>
      new Promise(resolve =>
        setTimeout(() => resolve(`Promise Resolved: ${val}`), 5000)
      );

    const source = of([1, 2, 3, 4, 5]);
    //emit array of all 5 results
    const example = source.pipe(mergeMap(q => forkJoin(...q.map(myPromise))));
    /*
      output:
      [
       "Promise Resolved: 1",
       "Promise Resolved: 2",
       "Promise Resolved: 3",
       "Promise Resolved: 4",
       "Promise Resolved: 5"
      ]
    */
    const subscribe = example.subscribe(val => console.log(val));


也有这个不错的食谱彼得 B 史密斯 ,也使用 forkJoin对于相同的提议:
  • Making chained API Calls using @ngrx/Effects
  • 关于RXJS 等待所有 Observables 完成并返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48666086/

    相关文章:

    angular - 如何将 bool 主题的值初始化为 false?

    javascript - 如何从 rxjs Observable 获取刻度数

    javascript - AngularFirebaseAuth : Calling server api just after firebase auth?

    rest - 使用 Angular2 ngrx 的类似 Twitter 的应用程序。构建 AppState

    angular - NgRx reducer 中的拼接索引不会删除一个索引

    angular - 引用错误 : Rx is not defined

    javascript - 拆分 RxJS 可观察输出

    angular - 无法添加属性 _$visited,对象不可使用 primeNG、angular 2 和 @ngrx 扩展

    angular - 覆盖 NGRX DefaultPersistenceResultHandler 的 handleSuccess

    angular - ngrx 中的 reducer 和不变性