在我的 Android 代码中,两个 View 需要同时以 io.reactivex.Flowable 形式提供的数据库中的一些数据。一个 View 需要纯数据,但另一个 View 应用映射函数,涉及一些 IO 并需要一些时间。
问题是两个订阅者只有在映射函数终止后才能获取数据。这会导致其中一个 View 为空白,而另一个 View 正在加载。
Flowable<Integer> rxRoomSourceFlowable = Flowable.fromArray(1, 2, 3).share();
rxRoomSourceFlowable
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribeOn(Schedulers.io())
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribeOn(Schedulers.io())
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
输出:
Mapped subscriber: 1!
Plain subscriber: 1
Mapped subscriber: 2!
Plain subscriber: 2
Mapped subscriber: 3!
Plain subscriber: 3
正如您所看到的,普通订阅者仅在长时间运行的映射函数终止后才会收到通知,即使结果完全无关。调用 subscribe()
的顺序并不重要,因为订阅无论如何都是异步发生的。
如何确保普通订阅者在数据可用时立即得到通知,而不需要等待映射功能?与使用 map()
相比,是否有其他选项可以更改 Flowable
内的数据?
最佳答案
share()
以锁步方式分派(dispatch)事件,因此,如果其中一个订阅者延迟,另一个订阅者将无法及时获取项目。您必须将有问题的订阅者移至另一个线程并让共享
进度:
Flowable<Integer> rxRoomSourceFlowable =
Flowable.fromArray(1, 2, 3)
.subscribeOn(Schedulers.io())
.share();
rxRoomSourceFlowable
.observeOn(Schedulers.io())
.map(integer -> {
// Some long running operation
Thread.sleep(5000);
return integer + "!";
})
.subscribe(str -> System.out.println("Mapped subscriber: " + str));
rxRoomSourceFlowable
.subscribe(integer -> System.out.println("Plain subscriber: " + integer));
关于java - Flowable.map() 中长时间运行的操作会阻止所有订阅者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48600140/