假设我有一个像这样的序列
:
Rx.Observable
.interval(1000)
.subscribe(data => {console.log(data)})
使用操作符
,我如何“重新启动
”序列,即取消订阅
和重新订阅
。
真实场景是序列
是一个套接字流,在某些条件下我们需要取消订阅
和重新订阅
,有点像retryWhen(errors)
有效,但没有错误...理想情况下是这样的...retryWhen(bool:Subject)
。
最佳答案
我会使用 switchMap() 来完成此操作,因为它会自动取消订阅旧的 Observable 并订阅新的 Observable。在本例中,我们将仅使用 .switchMap(() => source)
:
const subject = new Subject();
const source = Observable.create(obs => {
console.log('Observable.create');
obs.next(42);
});
subject.switchMap(() => source)
.subscribe(v => console.log('next:', v));
setTimeout(() => subject.next(), 1000);
setTimeout(() => subject.next(), 5000);
这将打印以下内容:
Observable.create
next: 42
Observable.create
next: 42
您将拥有 WebSocket 源(或您拥有的任何内容),而不是 source
。
关于RxJS - 重试或重置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44680701/