我有两个源可观察对象,我需要在一个源可观察对象发出后立即计算一些数据。我正在尝试使用 combineAll()
运算符,但它仅在每个源可观察对象首次发出时发出一个值。
是否有任何类似于 combineAll()
的运算符在任何源可观察对象首次发出时立即发出?如果不是,最明确的方法是什么?
我尝试过的:
const source1$ = service.getSomeData();
const source2$ = service.getOtherData();
combineLatest(
source1$,
source2$
).pipe(
map([source1Data, source2Data] => {
// this code only gets executed when both observables emits for the first time
return source1Data + source2Data;
})
)
最佳答案
如果我没理解错的话,您需要如下图所示的模式:
stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------
result$ => ------ 1 ------ 12 ------ 42 --------------
如果一个值可用,则发出该值。如果两者都可用,则发出两者的组合,在本例中为简单求和 (12 + 30 = 42);
首先是输入流,为了本示例,我将它们设为主题,因此我们可以手动推送数据:
const stream1$ = new Subject();
const stream2$ = new Subject();
接下来我们将组合输入,首先通过 startWith 运算符进行管道传输。这确保 combineLatest 产生一个立即发出的 observable - 准确地说是 [null, null]
。
const combined$ = combineLatest(
stream1$.pipe(startWith(null)),
stream2$.pipe(startWith(null)),
);
现在您有一个始终发出长度为 2 的数组的可观察对象,其中包含您的数据(本例中为数字)和 null 的任意组合,如下图所示:
stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------
combined$ [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
最后,您可以检查并将此输出映射
为您想要的格式:两个数字的总和(如果两个数字都可用)或第一个可用的值:
const processedCombinations$ = combined$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
结果:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
仍然存在一个问题:combined$
发出的第一个值是 [null, null]
,导致 processedCombinations$
发出 null
最初。解决此问题的一种方法是使用 skipWhile
将另一个管道链接到 processedCombinations$
上:
const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));
结果:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
另一种 - 我觉得更好 - 方法是在 processedCombinations$
(现在实际上是 final$
)从它创建之前过滤 combined$
流:
const combinedFiltered$ = combined$.pipe(
filter(([first, second])=> first !== null || second !== null),
);
const final$ = combinedFiltered$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
相应的图表很好地显示了如何尽可能早地在流层次结构中消除不相关的值:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$ => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
上面的图表可以用这段代码生成:
final$.subscribe(console.log);
stream1$.next(1);
// logs: 1
stream1$.next(12);
// logs: 12
stream2$.next(30);
// logs: 42
使用的进口:
import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';
关于javascript - RxJS combineLatest 无需等待源可观察量发出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54039425/