java - SubscribeOn 仅使用池中的 1 个线程

标签 java spring-webflux project-reactor

关于 subscribeOn 有很多问题,理论上它很简单:它使订阅者进程与特定的 Scheduler 一起运行。但是当我运行这个时:

    @Test
void fluxWithSubscribeOnTest() {
    final Scheduler s = Schedulers.newParallel("parallel", 10);
    final Publisher<Integer> integerFlux =
            Flux
                    .range(1, 1000)
                    .doOnNext(integer -> log.info("executed"))
                    .subscribeOn(s);
    StepVerifier.create(integerFlux)
            .expectNextCount(1000)
            .verifyComplete();
}

我看到所有 1000 个日志都写入同一个并行 1 线程

18:39:47.612 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed

我希望上面的代码能够在 subscribeOn 提供的池中的多个线程中进行记录(因此执行 onNext)。 ParallelFlux 以这种方式工作,并且正如预期的那样,它记录在不同的线程中:

    @Test
void parallelFluxTest() {
    final Scheduler s = Schedulers.newParallel("parallel", 10);
    final Publisher<Integer> integerFlux =
            Flux
                    .range(1, 1000)
                    .parallel(10)
                    .runOn(s)
                    .doOnNext(integer -> log.info("executed"));
    StepVerifier.create(integerFlux)
            .expectNextCount(1000)
            .verifyComplete();
}

18:43:42.377 [parallel-1] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-2] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-3] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-4] INFO com.performance.TestClass - executed
18:43:42.378 [parallel-5] INFO com.performance.TestClass - executed

有人可以解释为什么 subscribeOn 示例仅使用一个线程进行日志记录,而池大小为 10 吗?

最佳答案

我认为当您使用 Scheduler.parallel()Scheduler.newParallel() 时,您会因为“并行”这个词而感到困惑

在这种情况下,“并行”并不意味着它会神奇地使您的处理默认并行。

这意味着调度程序有固定的线程池,用于执行一些 CPU 密集型操作,并且适合并行工作。

建议使用此调度程序来执行 CPU 密集型操作。

关于Flux.range():此方法旨在发出从头开始的计数递增整数序列。在第一种情况下,这是一个顺序操作,它会在您订阅通量的线程上发出数字。除非将 parallel()runOn()

一起使用,否则不会有任何并行工作

关于java - SubscribeOn 仅使用池中的 1 个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75665561/

相关文章:

java - 我如何在整个 Activity 中传递实例变量?

从 LaTeX 源创建 PDF 的 Java 库

java - Jsoup 设置接受语言

java - Flux to List<Objects> 无阻塞

java - 我怎样才能让我的测试版过期?

java - WebClient 请求级别超时抛出称为默认 onErrorDropped 的运算符

java - 从 Mono.first 发出第一个成功结果

spring-webflux - Spring 5 RouterFunction 状态 415,原因 "Content type ' application/json' 不支持”

java - 从 Flux<Integer> 中分块读取

java - 为什么 subscribeOn 方法不切换上下文?