java - Flowable.map() 中长时间运行的操作会阻止所有订阅者

标签 java rx-java reactive-programming rx-android

在我的 Android 代码中,两个 View 需要同时以 io.reactivex.Flowable 形式提供的数据库中的一些数据。一个 View 需要纯数据,但另一个 View 应用映射函数,涉及一些 IO 并需要一些时间。

问题是两个订阅者只有在映射函数终止后才能获取数据。这会导致其中一个 View 为空白,而另一个 View 正在加载。

Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();

rxRoomSourceFlowable
    .map(integer -> {
        // Some long running operation
        Thread.sleep(5000);
        return integer + "!";
    })
    .subscribeOn(Schedulers.io())
    .subscribe(str -> System.out.println("Mapped subscriber: " + str));

rxRoomSourceFlowable
    .subscribeOn(Schedulers.io())
    .subscribe(integer -> System.out.println("Plain subscriber: " + integer));

输出:

Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3

正如您所看到的,普通订阅者仅在长时间运行的映射函数终止后才会收到通知,即使结果完全无关。调用 subscribe() 的顺序并不重要,因为订阅无论如何都是异步发生的。

如何确保普通订阅者在数据可用时立即得到通知,而不需要等待映射功能?与使用 map() 相比,是否有其他选项可以更改 Flowable 内的数据?

最佳答案

share() 以锁步方式分派(dispatch)事件,因此,如果其中一个订阅者延迟,另一个订阅者将无法及时获取项目。您必须将有问题的订阅者移至另一个线程并让共享进度:

Flowable<Integer> rxRoomSourceFlowable = 
    Flowable.fromArray(1, 2, 3)
    .subscribeOn(Schedulers.io())
    .share();

rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
    // Some long running operation
    Thread.sleep(5000);
    return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));

rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));

关于java - Flowable.map() 中长时间运行的操作会阻止所有订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48600140/

相关文章:

java - 如何将多个值添加到 TreeMap 中的同一个键?

Java正则表达式从日志文件中获取子度量

r - 具有动态文本的 ShinyApp 出现在多个 UI 元素的任何更改上

java - 如何从 Observable.fromIterable() 获得最终结果?

C# Reactive Extensions - 我应该退回一次性的吗?

r - Shiny 的节点 react 性依赖树

java - 如何跟踪你的android代码执行的位置?

java - 显示屏幕截图作为 pdf 下载链接

java - RxAndroid -BehaviorSubject 未在 onNext 上发出

java - RxJava onErrorResumeNext 调度器