java - Mono 与 CompletableFuture

标签 java reactive-programming project-reactor completable-future

CompletableFuture 在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在 CompletableFuture 中有一个 API 调用。这是 API 调用阻塞吗?线程会被阻塞直到它没有从 API 得到响应吗? (我知道主线程/tomcat线程将是非阻塞的,但是正在执行CompletableFuture任务的线程呢?)

据我所知,Mono 是完全非阻塞的。

请阐明这一点,如果我错了,请纠正我。

最佳答案

CompletableFuture 是异步的。但它是非阻塞的吗?

关于 CompletableFuture 的一个事实是它是真正的异步,它允许您从调用者线程和 API(例如 thenXXX)异步运行任务。允许您在结果可用时对其进行处理。另一方面,CompletableFuture并不总是非阻塞的。例如,当您运行以下代码时,它将在默认的 ForkJoinPool 上异步执行。 :

CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
});

很明显 ThreadForkJoinPool执行任务的函数最终会被阻塞,这意味着我们不能保证调用是非阻塞的。

另一方面,CompletableFuture公开 API,使您能够真正做到非阻塞。

例如,您始终可以执行以下操作:

public CompletableFuture myNonBlockingHttpCall(Object someData) {
    var uncompletedFuture = new CompletableFuture(); // creates uncompleted future

    myAsyncHttpClient.execute(someData, (result, exception -> {
        if(exception != null) {
            uncompletedFuture.completeExceptionally(exception);
            return;
        }
        uncompletedFuture.complete(result);
    })

    return uncompletedFuture;
}

如您所见,CompletableFuture的API future 为您提供completecompleteExceptionally方法可以在需要时完成执行,而不会阻塞任何线程。

Mono 与 CompletableFuture

在上一节中,我们概述了 CF 的行为,但是 CompletableFuture 和 Mono 之间的主要区别是什么?

值得一提的是,我们也可以阻止 Mono。没有人阻止我们编写以下内容:

Mono.fromCallable(() -> {
    try {
        Thread.sleep(1000);
    }
    catch (InterruptedException e) {

    }

    return 1;
})

当然,一旦我们订阅了 future,调用者线程就会被阻塞。但我们总是可以通过提供额外的 subscribeOn 来解决这个问题。运算符(operator)。尽管如此,更广泛的 API Mono不是关键功能。

为了了解 CompletableFuture 之间的主要区别和Mono ,让我们回到前面提到的myNonBlockingHttpCall方法实现。

public CompletableFuture myUpperLevelBusinessLogic() {
    var future = myNonBlockingHttpCall();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception
       var errorFuture = new CompletableFuture();
       errorFuture.completeExceptionally(new RuntimeException());

       return errorFuture;
    }

   return future;
}

CompletableFuture为例,一旦调用该方法,它将急切地执行对另一个服务/资源的 HTTP 调用。即使在验证某些前置/后置条件后我们并不真正需要执行结果,它也会开始执行,并且将为这项工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。

相比之下,Mono根据定义,类型是惰性的:

public Mono myNonBlockingHttpCallWithMono(Object someData) {
    return Mono.create(sink -> {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            })
    });
} 

public Mono myUpperLevelBusinessLogic() {
    var mono = myNonBlockingHttpCallWithMono();

    // ... some code

    if (something) {
       // oh we don't really need anything, let's just throw an exception

       return Mono.error(new RuntimeException());
    }

   return mono;
}

在这种情况下,直到最后mono都不会发生任何事情。已订阅。因此,仅当Mono时由 myNonBlockingHttpCallWithMono 返回方法,将被订阅,逻辑提供给Mono.create(Consumer)将被执行。

我们还可以走得更远。我们可以让我们的执行更加懒惰。您可能知道,Mono延伸Publisher来自 react 流规范。 Reactive Streams 的一大特色是背压支持。因此,使用 Mono仅当确实需要数据并且我们的订阅者准备好使用它们时,我们才能执行 API:

Mono.create(sink -> {
    AtomicBoolean once = new AtomicBoolean();
    sink.onRequest(__ -> {
        if(!once.get() && once.compareAndSet(false, true) {
            myAsyncHttpClient.execute(someData, (result, exception -> {
                if(exception != null) {
                    sink.error(exception);
                    return;
                }
                sink.success(result);
            });
        }
    });
});

在此示例中,我们仅在订阅者调用 Subscription#request 时才执行数据因此,它通过这样做来声明已准备好接收数据。

摘要

  • CompletableFuture是异步的并且可以是非阻塞的
  • CompletableFuture很渴望。你不能推迟执行。但您可以取消它们(这比没有好)
  • Mono是异步/非阻塞的,可以轻松执行不同的任何调用 Thread通过编写主要Mono与不同的运营商。
  • Mono确实是懒惰的,并且允许通过订阅者的存在及其消费数据的准备情况来推迟执行启动。

关于java - Mono 与 CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54866391/

相关文章:

java - 找不到 PreferenceScreen 类

javascript - 修复此测试 : Functional-Reactive Programming Tutorial

java - WebClient 的 bodyToMono 错误

project-reactor - 如何监控 Flux.onBackPressureBuffer() 队列大小

java - 无法启动应用程序 - java.lang.NullPointerException

java - 如何在android studio中将堆栈从一个 Activity 传递到另一个 Activity

angular - 如何轻松地将 Observable 转换或分配给 Behavior Subject,以便其他组件可以共享它

ios - 使用 Bond 定期触发一个方法

spring-webflux - 我什么时候使用 Mono.flatMapIterable 和 Mono.flapMapMany?

Java 8 -- 如何使用 xsd :duration 计算从现在开始的(毫秒)秒