java - 为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?

标签 java rx-java reactive

我目前正在使用 Rx 1。

我有以下测试用例:

static void printThread(String format, Object... objects) {
    System.out.println(String.format("%s %s", Thread.currentThread().getName(),
            String.format(format, objects)));
}

public void testFoo() throws InterruptedException {
    Observable.fromCallable(() -> { printThread("callable"); return 1L;})
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(() -> printThread("A"))
              .doOnSubscribe(() -> printThread("B"))
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(() -> printThread("C"))
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(() -> printThread("D"))
              .toBlocking()
              .subscribe();

    printThread("next!");

    Completable.fromCallable(() -> { printThread("callable"); Thread.sleep(10_000); return 1L;})
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(a -> printThread("A"))
              .doOnSubscribe(a -> printThread("B"))
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(a -> printThread("C"))
              .subscribeOn(Schedulers.newThread())
              .doOnSubscribe(a -> printThread("D"))
               .andThen(Completable.fromAction(() -> printThread("E")))
               .andThen(Completable.fromAction(() -> printThread("F")).subscribeOn(Schedulers.newThread()))
               .await();
}

产生以下输出:

main D
RxNewThreadScheduler-1 C
RxNewThreadScheduler-2 B
RxNewThreadScheduler-2 A
RxNewThreadScheduler-3 callable
main next!
RxNewThreadScheduler-6 A
RxNewThreadScheduler-6 B
RxNewThreadScheduler-6 C
RxNewThreadScheduler-6 D
RxNewThreadScheduler-6 callable
RxNewThreadScheduler-6 E
RxNewThreadScheduler-7 F

Process finished with exit code 0

为什么 ObservableCompletable 之间的订阅时间副作用的调度方式存在差异?

我认为发生的事情是,对于可观察的行为,产生的原因是订阅发生在默认的单线程调度模式下,除了调用 subscribeOn() 的地方,这就是为什么 A 和 B 发生在同一线程,但其他所有事情都发生在不同线程上。

但我不明白为什么要为 Completable 更改此行为。

最佳答案

RxJava 1 在这方面有点不一致。 1.x Completable.subscribeOn 在订阅时线程切换后调用 onSubscribe,而使用 Observable 时,doOnSubscribe 得到在线程切换到上游之前调用。

使用 RxJava 2,它们现在是一致的:

main D
RxNewThreadScheduler-1 C
RxNewThreadScheduler-2 A
RxNewThreadScheduler-2 B
RxNewThreadScheduler-3 callable
main next!
main D
RxNewThreadScheduler-4 C
RxNewThreadScheduler-5 A
RxNewThreadScheduler-5 B
RxNewThreadScheduler-6 callable

关于java - 为什么 Completable 和 Observable 之间的订阅时间副作用安排不同?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51934671/

相关文章:

java - 使用java查询mongo oplog的时间戳字段

android - 在 t 秒内点击 n 次后切换状态

android - 如果作为 RxJava Observable 提供,Kotlin 密封类子类需要强制转换为基类

ios - 使用 RxSwift 的 combineLatest 中超过 8 个参数

java - 替换字符串中的字符

java - 显示名称包含特殊字符的图像

java - 调用辅助方法/函数

android - rxJava 将 List 转换为 Map

Swift - Bond Framework - 一对多绑定(bind)关系

java - RXJava - concat 不起作用