javascript - Rxjs:使用 takeUntil(timer) 的 Observable 在计时器滴答后继续发射

标签 javascript rxjs

我遇到了一个非常奇怪的 takeUntil() 行为。我创建了一个可观察的计时器:

let finish = Observable.timer(3000);

然后我等一段时间再打电话

// 2500 ms later
someObservable.takeUntil(finish);

我希望所说的可观察对象在计时器“滴答”后停止发射,即在其创建后约 500 毫秒。实际上,它在创建后会持续发射 3000 毫秒,远远超过计时器“滴答”的那一刻。如果我使用包含绝对时间值的 Date 对象创建计时器,则不会发生这种情况。

这是设计使然吗?如果是,解释是什么?

这是完整的代码,可以用 node.js 运行(它需要 npm install rx):

let {Observable, Subject} = require("rx")
let start = new Date().getTime();
function timeMs() { return new Date().getTime() - start };

function log(name, value) { 
    console.log(timeMs(), name, value);
}

Observable.prototype.log = function(name) {
    this.subscribe( v=>log(name,v), 
                    err=>log(name, "ERROR "+err.message), 
                    ()=>log(name, "DONE"));
    return this;
}

let finish = Observable.timer(3000).log("FINISH");
setTimeout( ()=>Observable.timer(0,500).takeUntil(finish).log("seq"), 2500);

这会生成以下输出:

2539 'seq' 0
3001 'FINISH' 0
3005 'FINISH' 'DONE'
3007 'seq' 1
3506 'seq' 2
4006 'seq' 3
4505 'seq' 4
5005 'seq' 5
5506 'seq' 6
5507 'seq' 'DONE'

如果我使用绝对时间创建计时器:

let finish = Observable.timer(new Date(Date.now()+3000)).log("FINISH");

然后它的行为符合预期:

2533 'seq' 0
3000 'seq' 'DONE'
3005 'FINISH' 0
3005 'FINISH' 'DONE'

这种行为似乎在各种情况下都相当一致。例如如果您采用一个间隔并使用 mergeMap()switchMap() 创建子序列,结果将是相似的:子序列在完成事件之后继续发射。

想法?

最佳答案

您忘记了冷 Observables 的第一条规则:每个订阅都是一个新流。

您的log 运算符有错误;它订阅了一次 Observable(因此创建了第一个订阅),然后返回原始的 Observable,它被再次订阅,隐含地,当您将它传递给 takeUntil 运算符时。因此,实际上您实际上有两个 seq 流处于事件状态,它们的行为都正确。

它在绝对情况下有效,因为您基本上是将每个流设置为在特定时间发出,而不是订阅发生时的相对时间。

如果你想看到它的工作,我建议你将你的实现更改为:

let start = new Date().getTime();
function timeMs() { return new Date().getTime() - start };

function log(name, value) { 
    console.log(timeMs(), name, value);
}

Observable.prototype.log = function(name) {
    // Use do instead of subscribe since this continues the chain
    // without directly subscribing.
    return this.do(
      v=>log(name,v), 
      err=>log(name, "ERROR "+err.message), 
      ()=>log(name, "DONE")
    );
}

let finish = Observable.timer(3000).log("FINISH");

setTimeout(()=> 
  Observable.timer(0,500)
    .takeUntil(finish)
    .log("seq")
    .subscribe(), 
2500);

关于javascript - Rxjs:使用 takeUntil(timer) 的 Observable 在计时器滴答后继续发射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41529062/

相关文章:

javascript - 获取两个 Moment 日期/时间之间的日期/时间

javascript - RxJs:经过一段时间后才有新值

javascript - RxJs zip 运算符在 xstream 中等效吗?

angular - RXJS- Angular : Two Observable<booleans> should return Observable<boolean>

javascript - 删除一个可观察对象是否也会删除对它的订阅?

javascript - 打开和关闭灯

javascript - Jquery 自定义函数与 CallBack

javascript - JS - 将对象数组传递到新窗口

javascript - 尝试继承 for 循环中按名称引用的所有类时出现问题

Angular NgRx-仅在首次调用时才继续轮询服务的效果