我想使用 takeUntil
运算符完成一个不会自然完成的内部流,如下所示:
outerObservable
.mergeMap(() => {
innerObservable
.takeUntil(stopObservable$)
});
这有效,内部流按预期完成,但我希望外部流在停止信号后返回最后一个值。即使经过大量谷歌搜索,我仍然不知道该怎么做。
编辑:
我已经编写了一个似乎可以解决问题的运算符,但由于我知道有更好的方法或我完全误解的东西,所以将问题悬而未决。
function takeUntilThen(notifier, oneLastValue) {
return Rx.Observable.create(subscriber => {
var source = this;
notifier.subscribe(() => {
subscriber.next(oneLastValue);
subscriber.complete()
});
return source.subscribe(value => {
subscriber.next(value);
},
err => subscriber.error(err),
() => subscriber.complete());
});
}
最佳答案
听起来您想在 innerObservable
和 stopObservable
之间竞赛,无论哪个获胜,都应该能够输出一些东西。
为此,您可以使用恰当命名的 .race()
运算符,而不是使用 .takeUntil()
。请参阅有关取消的 redux-observable 文档配方中的示例:
const somethingEpic = action$ =>
action$.ofType(SOMETHING)
.mergeMap(action =>
innerObservable
.race(
stopObservable
.take(1)
)
);
因为你的例子是伪代码,下面是一个更具体的例子:
import { ajax } from 'rxjs/observable/dom/ajax';
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.mergeMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(response => fetchUserFulfilled(response))
.race(
action$.ofType(FETCH_USER_CANCELLED)
.map(() => incrementCounter())
.take(1)
)
);
关于javascript - 使用 takeUntil 完成一个 RxJS 流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41915339/