你能解释一下为什么我在 flatMap 中更改返回的 Observable 的调度程序时会得到奇怪的输出吗?例如,我有
Observable.range(1, 9)
.flatMap {
if (it < 5) {
Observable.just(it)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
} else {
Observable.just(it)
}
}
.subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)
作为输出,我每次运行都有不同的结果。例如。
第一次发射
1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6
第二次发射输出:
5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8
最佳答案
flatMap
非确定性地在参与线程之一上合并,因此即使内部源具有 subscribeOn
和/或 observeOn
定义,不能保证哪个线程将在特定时刻赢得从源发出项目。因此,您必须申请observeOn
后 flatMap
如果您想确保后续事件处理发生在所需的线程上(直到有另一个异步边界运算符)。
关于rx-java2 - 具有不同调度程序的 flatMap 的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45903743/