javascript - RxJS - 收集异步操作结果

标签 javascript typescript reactive-programming rxjs observable

我想对数组的每个元素执行异步操作并将其结果收集在字典中。我目前的做法是:

let asyncOp = () => Rx.Observable.interval(300).take(1);
let dict = {};

Rx.Observable.from(['a', 'b'])
  .mergeMap(el => asyncOp()
              .map(asyncOpRes => dict[el] = asyncOpRes)
              .do(state => console.log('dict state: ', dict))
  )
  .takeLast(2)
  .take(1)
  .map(() => dict)
  .subscribe(res => console.log('dict result: ', res));
<script src="https://npmcdn.com/@reactivex/rxjs@5.0.0-beta.7/dist/global/Rx.umd.js"></script>

基本上这就像我想要的那样,但它似乎是 RxJs 运算符的尴尬用法。所以我需要以下方面的帮助:

  1. 避免 dict 突变(尝试使用 scan(),但不知道如何在这里使用它。有一个 mergeScan() 方法,但这里相同)
  2. takeLast 和 take 的使用 - 应该可以简化吗?

我认为我缺少一个 RxJS 运算符,它可以帮助我简化此操作。

最佳答案

要“对数组的每个元素执行异步操作并将其结果收集到字典中”,可以使用 mergeMap 显着简化代码。和 reduce功能:

import * as Rx from "rxjs/Rx";

const asyncOp = () => Rx.Observable.interval(300).take(1);

Rx.Observable.from(["a", "b"])

    // Perform the async operation on the values emitted from the
    // observable and map the emitted value and async result into
    // an object.

    .mergeMap((key) => asyncOp().map((result) => ({ key, result })))

    // Use reduce to build an object containing the emitted values
    // (the keys) and the async results.

    .reduce((acc, value) => { acc[value.key] = value.result; return acc; }, {})
    .subscribe((value) => { console.log(value); });

关于javascript - RxJS - 收集异步操作结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39353780/

相关文章:

javascript - 轻量级的scrollUp函数——jQuery

javascript - Sails.js:如何实际运行测试

typescript - Typescript 中类的公共(public)访问修饰符的目的是什么?

typescript - 为什么 Typescript 允许类型切片?

r - 在适用于 R 的 Shiny 应用程序中,如何延迟响应式的触发?

javascript - 使用canvas HTML5 进行图像幻灯片放映

javascript - 在数组 JavaScript 的键值对中输出不为空的键值对

typescript - 使用 typescript 向 AWS Elasticsearch Service 发出签名请求

javascript - 如何手动销毁ReactiveDict?

java - Vertx 线程模型