我是响应式(Reactive)编程/项目 react 器的新手,试图理解这些概念。使用范围方法创建了一个 Flux 并订阅了。当我查看日志时,一切都在主线程上运行。
Flux
.range(1, 5)
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
[DEBUG] (main) Using Console logging [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) 1 [ INFO] (main) | onNext(2) 2 [ INFO] (main) | onNext(3) 3 [ INFO] (main) | onNext(4) 4 [ INFO] (main) | onNext(5) 5 [ INFO] (main) | onComplete() End of Execution
一旦 Publisher 完成了所有元素的发射,那么只有其余的代码被执行(System.out.println("End of Execution"); 在上面的例子中)。发布者会默认阻止线程吗?如果我更改调度程序,似乎它不会阻塞线程。
Flux
.range(1, 5)
.log()
.subscribeOn(Schedulers.elastic())
.subscribe(System.out::println);
System.out.println("End of Execution");
Thread.sleep(10000);
[DEBUG] (main) Using Console logging End of Execution [ INFO] (elastic-2) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (elastic-2) | request(unbounded) [ INFO] (elastic-2) | onNext(1) 1 [ INFO] (elastic-2) | onNext(2) 2 [ INFO] (elastic-2) | onNext(3) 3 [ INFO] (elastic-2) | onNext(4) 4 [ INFO] (elastic-2) | onNext(5) 5 [ INFO] (elastic-2) | onComplete()
最佳答案
默认情况下,Reactor 不强制执行并发模型,是的,许多运算符(operator)将继续在发生 subscribe()
操作的 Thread
上工作。
但这并不意味着使用Reactor就会阻塞主线程。您展示的示例是在内存中工作,不涉及 I/O 或延迟。此外,它会立即订阅结果。
您可以尝试以下代码片段,看看有什么不同:
Flux.range(1, 5)
.delayElements(Duration.ofMillis(100))
.log()
.subscribe(System.out::println);
System.out.println("End of Execution");
在日志中,我看到:
INFO --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution
在这种情况下,延迟元素将以不同的方式安排工作 - 由于这里没有任何东西可以使 JVM 保持事件状态,因此应用程序退出并且范围内的任何元素都不会被消耗。
在更常见的情况下,会涉及 I/O 和延迟,并且会以适当的方式安排工作,不会阻塞主应用程序线程。
关于reactive-programming - 主线程上的 Flux/Publisher,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53071609/