reactive-programming - 主线程上的 Flux/Publisher

标签 reactive-programming spring-webflux project-reactor

我是响应式(Reactive)编程/项目 react 器的新手,试图理解这些概念。使用范围方法创建了一个 Flux 并订阅了。当我查看日志时,一切都在主线程上运行。

     Flux
        .range(1, 5)
        .log()
        .subscribe(System.out::println);

    System.out.println("End of Execution");

[DEBUG] (main) Using Console logging [ INFO] (main) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (main) | request(unbounded) [ INFO] (main) | onNext(1) 1 [ INFO] (main) | onNext(2) 2 [ INFO] (main) | onNext(3) 3 [ INFO] (main) | onNext(4) 4 [ INFO] (main) | onNext(5) 5 [ INFO] (main) | onComplete() End of Execution

一旦 Publisher 完成了所有元素的发射,那么只有其余的代码被执行(System.out.println("End of Execution"); 在上面的例子中)。发布者会默认阻止线程吗?如果我更改调度程序,似乎它不会阻塞线程。

Flux
        .range(1, 5)
        .log()
        .subscribeOn(Schedulers.elastic())
        .subscribe(System.out::println);
    System.out.println("End of Execution");
    Thread.sleep(10000);

[DEBUG] (main) Using Console logging End of Execution [ INFO] (elastic-2) | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription) [ INFO] (elastic-2) | request(unbounded) [ INFO] (elastic-2) | onNext(1) 1 [ INFO] (elastic-2) | onNext(2) 2 [ INFO] (elastic-2) | onNext(3) 3 [ INFO] (elastic-2) | onNext(4) 4 [ INFO] (elastic-2) | onNext(5) 5 [ INFO] (elastic-2) | onComplete()

最佳答案

默认情况下,Reactor 不强制执行并发模型,是的,许多运算符(operator)将继续在发生 subscribe() 操作的 Thread 上工作。

但这并不意味着使用Reactor就会阻塞主线程。您展示的示例是在内存中工作,不涉及 I/O 或延迟。此外,它会立即订阅结果。

您可以尝试以下代码片段,看看有什么不同:

Flux.range(1, 5)
    .delayElements(Duration.ofMillis(100))
    .log()
    .subscribe(System.out::println);
System.out.println("End of Execution");

在日志中,我看到:

INFO   --- [main] reactor.Flux.ConcatMap.1 : onSubscribe(FluxConcatMap.ConcatMapImmediate)
INFO   --- [main] reactor.Flux.ConcatMap.1 : request(unbounded)
End of Execution

在这种情况下,延迟元素将以不同的方式安排工作 - 由于这里没有任何东西可以使 JVM 保持事件状态,因此应用程序退出并且范围内的任何元素都不会被消耗。

在更常见的情况下,会涉及 I/O 和延迟,并且会以适当的方式安排工作,不会阻塞主应用程序线程。

关于reactive-programming - 主线程上的 Flux/Publisher,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53071609/

相关文章:

java - 如何获取导致 Flux 异常的元素?

spring - 为什么默认配置的spring webflux中没有异常堆栈跟踪?

angular - 如何轻松地将 Observable 转换或分配给 Behavior Subject,以便其他组件可以共享它

c# - 响应式扩展 - 以同步方式刷新 Subject/IObservable

python - RxPY 中带有 from_iterable/range 的 subscribe_on

spring-webflux - Spring webflux block、flatmap 和 subscribe 的区别

spring - 如何在使用响应式(Reactive)数据源的 WebFlux 上编写自定义验证器

java - 将 Scala Future 转换为 Reactor Flux

ssl - 无法使用 Spring Data Reactive 和 Spring Boot 2.0 连接到 mongoDB

java - 延迟后缓存 Mono<>