javascript - 使用 takeUntil 完成一个 RxJS 流

标签 javascript rxjs system.reactive redux-observable

我想使用 takeUntil 运算符完成一个不会自然完成的内部流,如下所示:

  outerObservable
    .mergeMap(() => {
        innerObservable
            .takeUntil(stopObservable$)
    });

这有效,内部流按预期完成,但我希望外部流在停止信号后返回最后一个值。即使经过大量谷歌搜索,我仍然不知道该怎么做。

编辑:

我已经编写了一个似乎可以解决问题的运算符,但由于我知道有更好的方法或我完全误解的东西,所以将问题悬而未决。

function takeUntilThen(notifier, oneLastValue) {
    return Rx.Observable.create(subscriber => {
        var source = this;
        notifier.subscribe(() => {
            subscriber.next(oneLastValue);
            subscriber.complete()
        });
        return source.subscribe(value => {
            subscriber.next(value);
        },
        err => subscriber.error(err),
        () => subscriber.complete());
    });
}

最佳答案

听起来您想在 innerObservablestopObservable 之间竞赛,无论哪个获胜,都应该能够输出一些东西

为此,您可以使用恰当命名的 .race() 运算符,而不是使用 .takeUntil()。请参阅有关取消的 redux-observable 文档配方中的示例:

https://redux-observable.js.org/docs/recipes/Cancellation.html#cancel-and-do-something-else-eg-emit-a-different-action

const somethingEpic = action$ =>
  action$.ofType(SOMETHING)
    .mergeMap(action =>
      innerObservable
        .race(
          stopObservable
            .take(1)
        )
    );

因为你的例子是伪代码,下面是一个更具体的例子:

import { ajax } from 'rxjs/observable/dom/ajax';

const fetchUserEpic = action$ =>
  action$.ofType(FETCH_USER)
    .mergeMap(action =>
      ajax.getJSON(`/api/users/${action.payload}`)
        .map(response => fetchUserFulfilled(response))
        .race(
          action$.ofType(FETCH_USER_CANCELLED)
            .map(() => incrementCounter())
            .take(1)
        )
    );

关于javascript - 使用 takeUntil 完成一个 RxJS 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41915339/

相关文章:

javascript - 使用传递的变量加载快速模板返回错误 `Cannot set headers after they are sent to the client`

javascript - 递归检查字符串是否为回文的算法的时间复杂度是多少?

c# - Rx groupby 直到条件改变

javascript - 渐进(或迭代)回调

rxjs - 无需 RXJS 即可观察到兼容性

javascript - rxjs 可观察到的 : Using some kind of await?

javascript - 如何基于 rxjs 中现有的 observable 创建一个新的 observable 订阅?

c# - 重放值或错误的惰性可观察序列

c# - 使用 Reactive Extensions 时命名线程

javascript - 你如何根据浏览器宽度用 javascript 更改 img src?