rx-java2 - 具有不同调度程序的 flatMap 的行为

标签 rx-java2

你能解释一下为什么我在 flatMap 中更改返回的 Observable 的调度程序时会得到奇怪的输出吗?例如,我有

Observable.range(1, 9)
        .flatMap {
            if (it < 5) {
                Observable.just(it)
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.io())
            } else {
                Observable.just(it)
            }
        }
        .subscribe({ println("${it}: ${Thread.currentThread().name}") })
println("END")
Thread.sleep(200)

作为输出,我每次运行都有不同的结果。例如。
第一次发射
1: RxCachedThreadScheduler-3
2: RxCachedThreadScheduler-3
3: RxCachedThreadScheduler-3
5: main
6: main
7: main
END
4: RxCachedThreadScheduler-6
8: RxCachedThreadScheduler-6
9: RxCachedThreadScheduler-6

第二次发射输出:
5: main
1: main
2: main
3: main
6: main
7: main
8: main
9: main
END
4: RxCachedThreadScheduler-8

最佳答案

flatMap非确定性地在参与线程之一上合并,因此即使内部源具有 subscribeOn和/或 observeOn定义,不能保证哪个线程将在特定时刻赢得从源发出项目。因此,您必须申请observeOnflatMap如果您想确保后续事件处理发生在所需的线程上(直到有另一个异步边界运算符)。

关于rx-java2 - 具有不同调度程序的 flatMap 的行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45903743/

相关文章:

android - Android Retrofi 中 SingleObserver onSubscribe 函数总是返回 null

java - 订阅者如何通过 react 性拉背压来控制发布者?

rx-java - 如何在没有 .flatMap 的情况下控制流,这会破坏 react 流,从而阻止 distinctUntilChanged 等运算符在整个流上工作

java - Couchbase 错误 : rx. exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value

android - 使用改造和 rx java 的多个 api 请求

java - 将 Android Room 数据库与 RXJava2 一起使用时找不到符号类 EmptyResultSetException

rx-java2 - 即使提供了 doOnerror,RxJava2 Single 也不处理错误

java - RxJava2 上的 MVP。何时使用它

java - 改造 2 - RxJava : Unable to invoke no-args constructor for retrofit2. 调用 <RemoteDataObjectModel>

java - RxJava 调度程序始终与 sleep 在同一线程中工作