RxJS - 重试或重置

标签 rxjs rxjs5

假设我有一个像这样的序列:

Rx.Observable
.interval(1000)
.subscribe(data => {console.log(data)})

使用操作符,我如何“重新启动”序列,即取消订阅重新订阅

真实场景是序列是一个套接字流,在某些条件下我们需要取消订阅重新订阅,有点像retryWhen(errors) 有效,但没有错误...理想情况下是这样的...retryWhen(bool:Subject)

最佳答案

我会使用 switchMap() 来完成此操作,因为它会自动取消订阅旧的 Observable 并订阅新的 Observable。在本例中,我们将仅使用 .switchMap(() => source):

const subject = new Subject();

const source = Observable.create(obs => {
    console.log('Observable.create');
    obs.next(42);
});

subject.switchMap(() => source)
    .subscribe(v => console.log('next:', v));


setTimeout(() => subject.next(), 1000);
setTimeout(() => subject.next(), 5000);

这将打印以下内容:

Observable.create
next: 42
Observable.create
next: 42

您将拥有 WebSocket 源(或您拥有的任何内容),而不是 source

关于RxJS - 重试或重置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44680701/

相关文章:

angular - 合并连续 Observables 结果的最佳方法是什么?

rxjs - 获取可能是也可能不是可观察值的值的优雅方法

http - Angular2 可观察的

rxjs - 使用 rxjs 5 运行平均值

angular - 我应该在 AngOnDestroy 中的 nexted 之后完成我的主题吗

angular - 将一个 Observable<string> 和一个 Array<Observable<string>> 组合成一个 Observable<{string, string[]}>

angular - 类型 Observable<Observable<any[]>> 不可分配给类型 Observable<any[]>

javascript - 了解 rxjs 中的背压 - 仅缓存 5 张等待上传的图像

angular - Rxjs Subject.asObservable() 不工作但 BehaviorSubject 工作......为什么?

javascript - RxJS:如何用 nock 测试 Observable.ajax() ?