java - Project Reactor 并行执行

标签 java reactive-programming project-reactor

Project Reactor 3.1.5.RELEASE

考虑一下:

Flux.range(0, 10)
    .publishOn(Schedulers.parallel())
    .subscribe(i -> LOG.info(i));

我期望订阅者在多个线程中运行,但它只在一个线程中运行:

2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 0
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 1
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 2
2018-03-26 12:30:08.693  INFO 89770 --- [     parallel-1] d.a.Application    : 3
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 4
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 5
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 6
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 7
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 8
2018-03-26 12:30:08.694  INFO 89770 --- [     parallel-1] d.a.Application    : 9

文档告诉我的期望是正确的 ( http://projectreactor.io/docs/core/release/reference/#threading )。有人可以向我解释那里发生了什么吗?

最佳答案

react 流本质上是顺序的,publishOn 只是告诉源在哪里一个接一个地发出每个值。您需要通过 parallel 告诉流程并行运行,然后通过 runOn 指定调度程序:

Flux.range(0, 10)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i -> LOG.info(i))
.sequential()
.subscribe();

关于java - Project Reactor 并行执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49489348/

相关文章:

java - 删除应用程序不同 session 之间的插件首选项

java - 如何检查用户意外终止连接

reactive-programming - 如何在 RxJava 中计算移动平均值

java - Reactor 和 Webflux 中的 Mono 链的超时运算符究竟测量了什么?

spring - 真的有必要将 Hystrix 与响应式(Reactive) spring boot 2 应用程序一起使用吗?

java - 如何阻止和订阅 Flux\mono

java - Spring Boot 2.5.0 对于 Jackson Kotlin 类支持,请将 "com.fasterxml.jackson.module: jackson-module-kotlin"添加到类路径

javascript - 创建一个元史诗,其工作方式类似于combineEpics,但根据 namespace 选择一个史诗

java - 如何在 Spring WebFlux 中配置背压?

java - 如何使用 ExpectJ 工具在 java 代码中运行 Unix shell 脚本?