RxJS:是否有一个运算符像 mergeScan 一样工作,但只是在外部流发出时取消订阅内部流

标签 rxjs

我想要一个作为 mergeScanswitchMap 组合工作的运算符。它维护订阅的最后一个内部流的最后一个值,但是当外部流发出时,它只是使用最新值作为累加器并取消订阅最后一个内部流。

s$.pipe(
  someOp((acc, cur) => {
    if (e) {
      return a$.pipe(
        scan((acc1, cur1) => f(acc1, cur1), acc),
      );
    } else {
      return b$.pipe(
        scan((acc1, cur1) => g(acc1, cur1), acc),
      );
    }
  }, init));

应该作为:

s$:      ---T----F----
a$:      -a---a----a--
b$:      --b---b----b-

result$: -----x-----w-
where

    x = f(init, a)
    w = g(x, b)

换句话说,它不应该等待内部流完成。

我知道我可以实现个人订阅者和运营商,但这可以作为原始 RxJS 运营商的管道来实现吗?

最佳答案

我认为您可以在 mergeScan 上使用“并发”参数...

mergeScan(accumulatorFn, init, 1)

concurrent 限制订阅输入observables的数量,默认为正无穷大,设置为1就只有1个。

文档:https://rxjs-dev.firebaseapp.com/api/operators/mergeScan

关于RxJS:是否有一个运算符像 mergeScan 一样工作,但只是在外部流发出时取消订阅内部流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57529771/

相关文章:

angular - rxjs observable.interval();在浏览器中抛出不是函数异常

javascript - RxJs:如何根据可观察对象的状态进行循环?

javascript - RXJS 6 处理 http Observables 的方式是什么?

angular - 从 Angular 5 升级到 6 后,我遇到了很多 rxjs 错误

javascript - RxJS 可观察到的 : retry a Promise that fail

javascript - FirebaseListObservable 查询后变为 Observable

javascript - Rx debounce operator with first 和 last

Angular 6 zip 已弃用 : resultSelector is no longer supported, 管道改为映射

angular - 类型错误 : Object doesn't support property or method 'map' - AngularJS2

typescript - 为什么 takeWhile 运算符在包含时不过滤?