java - 为什么 subscribeOn 方法不切换上下文?

标签 java project-reactor

我的这个测试基于 4.5.2. The subscribeOn Method (忽略缺少的 new Thread(...) 部分):

    @Test
    public void theSubscribeOnMethod() throws InterruptedException {
        Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

        final Flux<String> flux = Flux
                .range(1, 2)
                .map(i -> 10 + i)
                .subscribeOn(s)
                .doOnSubscribe(sub -> System.out.println(
                        "[doOnSubscribe] " + Thread.currentThread().getName()))
                .map(i -> Thread.currentThread().getName() + ", value " + i);

        flux.subscribe(System.out::println);

        Thread.sleep(1000);
    }

打印此内容(注意 main 线程 [doOnSubscribe] ):
[doOnSubscribe] main
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

刚搬家subscribeOndoOnSubscribe产生这个结果(从我的角度来看是正确的):
[doOnSubscribe] parallel-scheduler-1
parallel-scheduler-1, value 11
parallel-scheduler-1, value 12

根据Flux.subscribeOn文档:

Run subscribe, onSubscribe and request on a specified Scheduler's Scheduler.Worker. As such, placing this operator anywhere in the chain will also impact the execution context of onNext/onError/onComplete signals from the beginning of the chain up to the next occurrence of a publishOn.



所以在这个例子中我期望 doOnSubscribe发生在 parallel-scheduler线程而不是 main尽管我放的地方subscribeOn .

这是一个错误吗?为什么会发生这种情况,我该如何解决?

PS:使用 reactor-core:3.1.0-RELEASE

最佳答案

这不是一个错误。这是预期的行为。这是事件的顺序,乍一看可能会令人困惑。

我稍微修改了你的例子:

Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);

final Flux<String> flux = Flux
        .range(1, 2)
        .map(i -> 10 + i)
        .subscribeOn(s)
        .doOnNext(value -> System.out.println(
                "[doOnNext] " + Thread.currentThread().getName() + " value: " + value))
        .doOnSubscribe(sub -> System.out.println(
                "[doOnSubscribe] " + Thread.currentThread().getName()));

flux.subscribe();

Thread.sleep(1000);

输出:
[doOnSubscribe] main
[doOnNext] parallel-scheduler-1 value: 11
[doOnNext] parallel-scheduler-1 value: 12

当您进行命令式思考时,您会期望运算符按声明顺序被调用,因此在另一个之下的一个将在以后被调用。这通常也适用于函数式/响应式(Reactive)编程,但情况并非总是如此。

在上面的例子中 doOnSubscribe实际上在管道中的其他任何东西之前运行。运营商首先收到有关订阅的通知,然后将订阅转发给其上游运营商,后者将其转发给其上游,依此类推……

所以事件发生的顺序是:
  • flux.subscribe()订阅管道并触发主线程上的执行
  • doOnSubscribe获取有关订阅的通知并在主线程上触发提供的使用者,然后将订阅转发到其上游
  • doOnNext收到有关订阅的通知,但它只对 onNext 感兴趣事件,因此除了将订阅转发到其上游(在主线程上)之外,它还没有做任何事情(!)
  • subscribeOn(s) - 上下文切换发生,此后的每个事件都在提供的调度程序的上下文中发生
  • 订阅转发给上游运营商:第一个 map然后 range , 都在平行线程
  • range是源,所以它发出第一项:onNext(1)平行线程
  • maponNext 上触发事件,进行转换并将转换后的元素作为 onNext 发送并行线程上的事件
  • subscribeOn除了转发 onNext 之外什么都不做 Activity
  • doOnNext消费者被触发,因为它收到了 onNext事件,仍在并行线程上
  • doOnSubscribe忽略 onNext事件,只是将事件转发给下一个运算符(operator)
  • onNext(2) 重复了 6-10 个事件项目
  • 流完成

  • TLDR :订阅事件从底部操作符传播到顶部操作符(doOnSubscribesusbcribeOn 在此传播过程中按此顺序触发),之后实际数据从顶部源操作符(range)反向流动) 至底部。

    doOnSubscribe之前运行 susbcribeOn ,它仍然会在主线程上运行。

    关于java - 为什么 subscribeOn 方法不切换上下文?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59553733/

    相关文章:

    java - 在 Java 中使用递归的字符串排列

    java - GUI、java、SWT 和世界地图表示

    spring - 在 Spring Webflux 功能应用程序中验证请求的最佳方法是什么

    spring - 为什么 Spring ReactiveMongoRepository 没有 Mono 的保存方法?

    java - 如何更新 Scala 中日期格式的列

    java - 如何自动缩放图形以使其适合 Y 轴?

    Java Flux 与 Observable/BehaviorSubject

    java - 是否可以混合使用 react 性代码,即 Mono 和 Flux 对象,它们是 react 性流以及 Java 8 Streams

    java - 使用 Reactor Mono 记录重试

    java - 将 java 应用程序数据冗余存储在文件中的最佳方法是什么?