javascript - RxJS combineLatest 无需等待源可观察量发出?

标签 javascript rxjs

我有两个源可观察对象,我需要在一个源可观察对象发出后立即计算一些数据。我正在尝试使用 combineAll() 运算符,但它仅在每个源可观察对象首次发出时发出一个值。

是否有任何类似于 combineAll() 的运算符在任何源可观察对象首次发出时立即发出?如果不是,最明确的方法是什么?

我尝试过的:

const source1$ = service.getSomeData();
const source2$ = service.getOtherData();

combineLatest(
  source1$,
  source2$
).pipe(
  map([source1Data, source2Data] => {
    // this code only gets executed when both observables emits for the first time
    return source1Data + source2Data;
  })
)

最佳答案

如果我没理解错的话,您需要如下图所示的模式:

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

如果一个值可用,则发出该值。如果两者都可用,则发出两者的组合,在本例中为简单求和 (12 + 30 = 42);

首先是输入流,为了本示例,我将它们设为主题,因此我们可以手动推送数据:

const stream1$ = new Subject();
const stream2$ = new Subject();

接下来我们将组合输入,首先通过 startWith 运算符进行管道传输。这确保 combineLatest 产生一个立即发出的 observable - 准确地说是 [null, null]

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

现在您有一个始终发出长度为 2 的数组的可观察对象,其中包含您的数据(本例中为数字)和 null 的任意组合,如下图所示:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

最后,您可以检查并将此输出映射为您想要的格式:两个数字的总和(如果两个数字都可用)或第一个可用的值:

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

结果:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

仍然存在一个问题:combined$ 发出的第一个值是 [null, null],导致 processedCombinations$ 发出 null 最初。解决此问题的一种方法是使用 skipWhile 将另一个管道链接到 processedCombinations$ 上:

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

结果:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

另一种 - 我觉得更好 - 方法是在 processedCombinations$ (现在实际上是 final$)从它创建之前过滤 combined$ 流:

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

相应的图表很好地显示了如何尽可能早地在流层次结构中消除不相关的值:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

上面的图表可以用这段代码生成:

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

使用的进口:

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';

关于javascript - RxJS combineLatest 无需等待源可观察量发出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54039425/

相关文章:

typescript - Angular 2日期反序列化

Angular 5 在方法上使用 debounceTime

javascript - 使用 AJAX 解析 JSON

带有 document.querySelectorAll 和 filter() 的 javascript 函数无法正常工作

javascript - Spotify 桌面应用程序是用 javascript 开发的吗?

javascript - 如何使用rxjs尽早逃脱捕获链?

javascript - 在 RxJS 中,为什么每个订阅都会执行一次管道?

javascript - 在 JavaScript/TypeScript 中检测字符串中的模式

javascript - 未提供范围键时出现batchGetItem错误

rxjs - shareReplayLatestWhileConnected with RxJS