rxjs - `observeOn`和参数化调度程序之间的区别

标签 rxjs

我希望以下两段代码是等效的。默认情况下,Repeat 使用 currentThread 调度程序。如果我们将其更改为立即调度程序:

Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
  return Rx.Observable.repeat(a, 3, Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));

上面的代码产生预期的结果:1, 1, 1, 2, 2, 2, ...。但以下代码不会,并会生成一系列混合值:

Rx.Observable.fromArray([1,2,3,4,5]).flatMap(a => {
  return Rx.Observable.repeat(a, 3).observeOn(Rx.Scheduler.immediate)
})
.subscribe(r => console.log(r));

我不明白这种行为,但我想我错过了一些东西。 repeat 可以传递一个 Scheduler 参数,但我想我也可以通过使用 observeOn 来强制 Observable 作用于特定的 Scheduler。我错过了什么?

最佳答案

区别在于,第一个使用调度程序进行生成,第二个仅使用它进行传播。

在第二个版本中,您仍然使用currentThread来创建值。 observeOn 只会在值从前一个运算符发出后将值强制到不同的调度程序上,但对于生成事件的运算符,这不会影响这些事件的生成。

如果您查看某些创建运算符(例如 fromArray)的内部,您会看到类似以下内容的内容:

//Changing the scheduler will change how recursive scheduling works
scheduler.schedulerRecursiveWithState(0, function(self, state) {
      if (i < len) {
        observer.onNext(array[i]);
        //Schedule the next event
        self(i + 1);
      } else {
        observer.onCompleted();
      }
});

observeOn 类似于执行以下操作:

//Doesn't change when events get generated, simply reschedules them for down stream
source.subscribe(function(x) {
  scheduler.scheduleWithState(x, function(self, state) {
    observer.onNext(x);
  });
});

关于rxjs - `observeOn`和参数化调度程序之间的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31003624/

相关文章:

javascript - Angular try catch 与捕获错误

rxjs - 你如何从 RXJS Observable retryWhen 抛出错误

javascript - Angular2 http重试逻辑

typescript - Angular 2 |订阅 Observables

javascript - 还是不明白 RXJS operator combineLatestAll 是如何工作的?

angular - ngrx 效果给出类型 'void' 不可分配给类型 'ObservableInput'

javascript - RxJS:每次返回并行 http 请求时更新客户端

node.js - 如何在 rxjs 中使用 Node 的转换流?

javascript - 仅使用 RxJs 重新实现 redux observable?

angular - 我有几个嵌套订阅。有什么好的方法来处理大量缩进吗?