rx-java - flatMap() observable 上的调度程序会影响外部 observable 上的调度程序吗?

标签 rx-java rx-java2

这似乎是一个有点愚蠢的问题,我已经做了一些测试来看看它在实践中是如何工作的,但我希望看到它得到证实,如果可能的话我想知道为什么Observable 合约及其实现,因为这感觉像是我对 Rx 规则的理解中的一个令人讨厌的漏洞。另外,如果您能告诉我在哪里可以找到这个,那么它将帮助我将来自己回答这些问题。

如果我使用以下 Observable:

Observable.range(0, 3)
          .observeOn(schedulerA)
          .flatMap(i -> Observable.just(i)
                                  .observeOn(schedulerB)
                                  .map(j -> -j))
          .doOnNext(i -> System.out.println(String.format("Got %d", i)))
          .subscribe()

然后运算符 .doOnNext(i -> System.out.println(String.format("Got %d", i))) 会在 schedulerA 上执行> 或 schedulerB 是否有正式或基于规范的原因来说明原因?

谢谢。

最佳答案

无法保证哪个调度程序 doOnNext 将运行。从概率的角度来看,它可能是其中之一。

原因是 flatMap 运算符使用先进的快速路径和工作窃取算法,该算法可以由任一线程触发,并且也将执行另一个线程的一些任务。

随着简单的内部 Observable 快速完成,schedulerA 可能仍在处理其订阅并检测到来自内部源的值可用,从而发出它位于同一个 schedulerA 线程上。其他时候,schedulerA 在生成项目之前完成内部 Observable 的处理。在这种情况下,schedulerB 将在 flatMap 内触发发射。

通常,当您不确定哪个线程将处理您的 doOnNext 时,请始终在其前面应用一个 observeOn 以及所需的调度程序。

关于rx-java - flatMap() observable 上的调度程序会影响外部 observable 上的调度程序吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51677721/

相关文章:

android - RxJava返回错误为onNext并继续流

java - RxJava 中的任务取消是如何工作的?

android - RxJava 链中的多个 subscribeOn()

android - Rxjava - 如何正确检查 Maybe/Observable 是否为空?

android - RxView.clicks() 在 onError 事件后死亡

reactive-programming - 如何使用 RxJava 串行批处理长进程?

java - 如何在RxJava中实现链锁

java - 向特征写入多个命令

rx-java - RxJava 中的 EventLoopScheduler 等价物?

rx-java - 如何避免 Rx 流中的重复网络调用?