spring - Reactor Mono 与 CompletableFuture

标签 spring java-8 project-reactor spring-webflux

刚刚开始探索reactor项目及其抽象Mono和Flux,并想了解与java 8准系统CompletableFuture的基本区别。

这是我的一个简单代码:

public static void main(String[] args) throws Exception {

    Mono.fromCallable(() -> getData())
            .map(s -> s + " World ")
            .subscribe(s -> System.out.println(s));

    CompletableFuture.supplyAsync(() -> getData())
            .thenAccept(System.out::println);

    System.out.println(Thread.currentThread()+" End ");
}

private static String getData() {

    int j=0;

    for(int i=0; i<Integer.MAX_VALUE; i++){
        j = j - i%2;
    }

    System.out.println(Thread.currentThread()+" - "+j);
    return " Hello ";
}

首先,CompletableFuture 并不令人意外。 supplyAsync 安排函数通过 ForkJoinPool 执行,“End”行立即打印,程序终止,因为主线程在这里的生命周期非常短暂 - 正如预期的那样。

但是 Mono.fromCallable(...) 阻塞了主线程。此外,在 getData() 函数中打印的线程名称是主线程。所以我看到的是顺序/阻塞行为,而不是顺序/非阻塞(异步)行为。是否因为我在同一线程上应用了订阅函数,它被阻塞了?有人可以解释一下吗?

最佳答案

Is it because I had applied a subscribe function on the same thread, it is blocking?

这似乎正是发生的事情。

这种特定行为让我有点惊讶,因为它不是大多数管道的行为方式。大多数管道都有一种或另一种方式的一些操作,使管道异步。 publishOnsubscribeOn 是明显的例子,但也是 flatMap might have such an effect可能还有许多其他人。在这些情况下,订阅将立即返回。

这暗示了响应式(Reactive)编程的一个非常重要的点:管道不应包含长阻塞调用。响应式(Reactive)管道旨在准备好并在订阅时处理事件而不会阻塞。因此,阻塞语句具有阻塞整个执行的真正潜力。通过使用Scheduler,您可以将此类调用限制在特殊的线程池中,从而控制其效果。

关于spring - Reactor Mono 与 CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47571345/

相关文章:

java - 使用 Reactor Mono 记录重试

java - 将Treecache集群属性存储在属性文件中

java - 模拟 mvc 期望异常

multithreading - CompletableFuture、supplyAsync() 和 thenApply()

java - 有没有办法在一行中将分组列表连接到 Java 8 中的集合中?

java - 使用lambda在Java流中调用带有参数的构造函数

java - Spring 5.0.0.M3 调用 ApplicationEventListener 时出错 : No ServletContext set - Exception encountered during context initialization

java - Apache CXF 身份验证 + Spring Security

java - 暂停 kafka 在响应式(Reactive) Kafka 中读取

project-reactor - Reactor 3.x - 限制 groupBy Flux 的时间