reactive-programming - Project Reactor 3 中的 publishOn 与 subscribeOn

标签 reactive-programming publish-subscribe project-reactor publisher reactive-streams

我在相同的通量上使用 publishOn 和 subscribeOn ,如下所示:

    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

虽然,当我同时使用两者时,日志中不会打印任何内容。
但是当我只使用 publishOn 时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

是否比 subscribeOn 更推荐 publishOn?或者它比 subscribeOn 有更多的偏好?两者有什么区别,什么时候用哪个?

最佳答案

我花了一些时间才理解它,也许是因为publishOn通常在 subscribeOn 之前解释,这是一个希望更简单的外行解释。
subscribeOn表示运行初始源发射,例如 subscribe(), onSubscribe() and request()在指定的调度程序工作人员(其他线程)上,对于任何后续操作也相同,例如 onNext/onError/onComplete, map etc并且无论 subscribeOn() 的位置如何,都会发生这种行为

如果你没有做任何publishOn在流利的调用中就是这样,一切都将在这样的线程上运行。

但是,只要您调用 publishOn()假设在中间,那么任何后续运算符(operator)调用都将在提供的调度程序工作人员上运行到这样的 publishOn() .

这是一个例子

Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

结果将是

1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5


如您所见,第一个 doOnNext()和以下 map()在名为 subscribeOn_thread 的线程上运行,直到任何 publishOn()调用,然后任何后续调用将在提供的调度程序上运行到该 publishOn()这将再次发生在任何后续调用中,直到有人调用另一个 publishOn() .

关于reactive-programming - Project Reactor 3 中的 publishOn 与 subscribeOn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48073315/

相关文章:

nservicebus - 在 nServiceBus 中订阅来自多个发布者的消息

javascript - 从 Node.js 订阅 SalesForce 主题时出错

带有 WebFlux 的 Spring Boot 总是在测试中抛出 403 状态

javascript - 为什么当 react 变量改变值时这个函数不运行?

c# - 在 UniRX 中将一个 Observable 映射到另一个不同类型的 Observable

c++ - 从 Turtlesim 订阅和发布几何/扭曲消息

spring-boot - 如何并行调用多个Spring Webclient并等待结果?

java - 为什么 Reactor Mono<Void> 被识别为一个空的 Mono?

javascript - 如何在 Vue.js 中使两个输入相互 react

java - RxJava : dynamically create Observables and send the final resut as Observable