我目前正在使用 Rx 1。
我有以下测试用例:
static void printThread(String format, Object... objects) {
System.out.println(String.format("%s %s", Thread.currentThread().getName(),
String.format(format, objects)));
}
public void testFoo() throws InterruptedException {
Observable.fromCallable(() -> { printThread("callable"); return 1L;})
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("A"))
.doOnSubscribe(() -> printThread("B"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("C"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(() -> printThread("D"))
.toBlocking()
.subscribe();
printThread("next!");
Completable.fromCallable(() -> { printThread("callable"); Thread.sleep(10_000); return 1L;})
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("A"))
.doOnSubscribe(a -> printThread("B"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("C"))
.subscribeOn(Schedulers.newThread())
.doOnSubscribe(a -> printThread("D"))
.andThen(Completable.fromAction(() -> printThread("E")))
.andThen(Completable.fromAction(() -> printThread("F")).subscribeOn(Schedulers.newThread()))
.await();
}
产生以下输出:
main D
RxNewThreadScheduler-1 C
RxNewThreadScheduler-2 B
RxNewThreadScheduler-2 A
RxNewThreadScheduler-3 callable
main next!
RxNewThreadScheduler-6 A
RxNewThreadScheduler-6 B
RxNewThreadScheduler-6 C
RxNewThreadScheduler-6 D
RxNewThreadScheduler-6 callable
RxNewThreadScheduler-6 E
RxNewThreadScheduler-7 F
Process finished with exit code 0
为什么 Observable
和 Completable
之间的订阅时间副作用的调度方式存在差异?
我认为发生的事情是,对于可观察的行为,产生的原因是订阅发生在默认的单线程调度模式下,除了调用 subscribeOn()
的地方,这就是为什么 A 和 B 发生在同一线程,但其他所有事情都发生在不同线程上。
但我不明白为什么要为 Completable 更改此行为。
最佳答案
RxJava 1 在这方面有点不一致。 1.x Completable.subscribeOn
在订阅时线程切换后调用 onSubscribe
,而使用 Observable
时,doOnSubscribe
得到在线程切换到上游之前调用。
使用 RxJava 2,它们现在是一致的:
main D
RxNewThreadScheduler-1 C
RxNewThreadScheduler-2 A
RxNewThreadScheduler-2 B
RxNewThreadScheduler-3 callable
main next!
main D
RxNewThreadScheduler-4 C
RxNewThreadScheduler-5 A
RxNewThreadScheduler-5 B
RxNewThreadScheduler-6 callable
关于java - 为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51934671/