我想要一个作为 mergeScan
和 switchMap
组合工作的运算符。它维护订阅的最后一个内部流的最后一个值,但是当外部流发出时,它只是使用最新值作为累加器并取消订阅最后一个内部流。
s$.pipe(
someOp((acc, cur) => {
if (e) {
return a$.pipe(
scan((acc1, cur1) => f(acc1, cur1), acc),
);
} else {
return b$.pipe(
scan((acc1, cur1) => g(acc1, cur1), acc),
);
}
}, init));
应该作为:
s$: ---T----F----
a$: -a---a----a--
b$: --b---b----b-
result$: -----x-----w-
where
x = f(init, a)
w = g(x, b)
换句话说,它不应该等待内部流完成。
我知道我可以实现个人订阅者和运营商,但这可以作为原始 RxJS 运营商的管道来实现吗?
最佳答案
我认为您可以在 mergeScan 上使用“并发”参数...
mergeScan(accumulatorFn, init, 1)
concurrent 限制订阅输入observables的数量,默认为正无穷大,设置为1就只有1个。
关于RxJS:是否有一个运算符像 mergeScan 一样工作,但只是在外部流发出时取消订阅内部流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57529771/