关于 subscribeOn 有很多问题,理论上它很简单:它使订阅者进程与特定的 Scheduler 一起运行。但是当我运行这个时:
@Test
void fluxWithSubscribeOnTest() {
final Scheduler s = Schedulers.newParallel("parallel", 10);
final Publisher<Integer> integerFlux =
Flux
.range(1, 1000)
.doOnNext(integer -> log.info("executed"))
.subscribeOn(s);
StepVerifier.create(integerFlux)
.expectNextCount(1000)
.verifyComplete();
}
我看到所有 1000 个日志都写入同一个并行 1 线程
18:39:47.612 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
18:39:47.613 [parallel-1] INFO com.performance.TestClass - executed
我希望上面的代码能够在 subscribeOn 提供的池中的多个线程中进行记录(因此执行 onNext)。 ParallelFlux 以这种方式工作,并且正如预期的那样,它记录在不同的线程中:
@Test
void parallelFluxTest() {
final Scheduler s = Schedulers.newParallel("parallel", 10);
final Publisher<Integer> integerFlux =
Flux
.range(1, 1000)
.parallel(10)
.runOn(s)
.doOnNext(integer -> log.info("executed"));
StepVerifier.create(integerFlux)
.expectNextCount(1000)
.verifyComplete();
}
18:43:42.377 [parallel-1] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-2] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-3] INFO com.performance.TestClass - executed
18:43:42.377 [parallel-4] INFO com.performance.TestClass - executed
18:43:42.378 [parallel-5] INFO com.performance.TestClass - executed
有人可以解释为什么 subscribeOn 示例仅使用一个线程进行日志记录,而池大小为 10 吗?
最佳答案
我认为当您使用 Scheduler.parallel()
或 Scheduler.newParallel()
时,您会因为“并行”这个词而感到困惑
在这种情况下,“并行”并不意味着它会神奇地使您的处理默认并行。
这意味着调度程序有固定的线程池,用于执行一些 CPU 密集型操作,并且适合并行工作。
建议使用此调度程序来执行 CPU 密集型操作。
关于Flux.range()
:此方法旨在发出从头开始的计数递增整数序列。在第一种情况下,这是一个顺序操作,它会在您订阅通量的线程上发出数字。除非将 parallel()
与 runOn()
关于java - SubscribeOn 仅使用池中的 1 个线程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/75665561/