CompletableFuture
在单独的线程上执行任务(使用线程池)并提供回调函数。假设我在 CompletableFuture
中有一个 API 调用。这是 API 调用阻塞吗?线程会被阻塞直到它没有从 API 得到响应吗? (我知道主线程/tomcat线程将是非阻塞的,但是正在执行CompletableFuture任务的线程呢?)
据我所知,Mono 是完全非阻塞的。
请阐明这一点,如果我错了,请纠正我。
最佳答案
CompletableFuture 是异步的。但它是非阻塞的吗?
关于 CompletableFuture 的一个事实是它是真正的异步,它允许您从调用者线程和 API(例如 thenXXX
)异步运行任务。允许您在结果可用时对其进行处理。另一方面,CompletableFuture
并不总是非阻塞的。例如,当您运行以下代码时,它将在默认的 ForkJoinPool
上异步执行。 :
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
});
很明显 Thread
在ForkJoinPool
执行任务的函数最终会被阻塞,这意味着我们不能保证调用是非阻塞的。
另一方面,CompletableFuture
公开 API,使您能够真正做到非阻塞。
例如,您始终可以执行以下操作:
public CompletableFuture myNonBlockingHttpCall(Object someData) {
var uncompletedFuture = new CompletableFuture(); // creates uncompleted future
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
uncompletedFuture.completeExceptionally(exception);
return;
}
uncompletedFuture.complete(result);
})
return uncompletedFuture;
}
如您所见,CompletableFuture
的API future 为您提供complete
和completeExceptionally
方法可以在需要时完成执行,而不会阻塞任何线程。
Mono 与 CompletableFuture
在上一节中,我们概述了 CF 的行为,但是 CompletableFuture 和 Mono 之间的主要区别是什么?
值得一提的是,我们也可以阻止 Mono。没有人阻止我们编写以下内容:
Mono.fromCallable(() -> {
try {
Thread.sleep(1000);
}
catch (InterruptedException e) {
}
return 1;
})
当然,一旦我们订阅了 future,调用者线程就会被阻塞。但我们总是可以通过提供额外的 subscribeOn
来解决这个问题。运算符(operator)。尽管如此,更广泛的 API Mono
不是关键功能。
为了了解 CompletableFuture
之间的主要区别和Mono
,让我们回到前面提到的myNonBlockingHttpCall
方法实现。
public CompletableFuture myUpperLevelBusinessLogic() {
var future = myNonBlockingHttpCall();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
var errorFuture = new CompletableFuture();
errorFuture.completeExceptionally(new RuntimeException());
return errorFuture;
}
return future;
}
以CompletableFuture
为例,一旦调用该方法,它将急切地执行对另一个服务/资源的 HTTP 调用。即使在验证某些前置/后置条件后我们并不真正需要执行结果,它也会开始执行,并且将为这项工作分配额外的 CPU/DB-Connections/What-Ever-Machine-Resources。
相比之下,Mono
根据定义,类型是惰性的:
public Mono myNonBlockingHttpCallWithMono(Object someData) {
return Mono.create(sink -> {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
})
});
}
public Mono myUpperLevelBusinessLogic() {
var mono = myNonBlockingHttpCallWithMono();
// ... some code
if (something) {
// oh we don't really need anything, let's just throw an exception
return Mono.error(new RuntimeException());
}
return mono;
}
在这种情况下,直到最后mono
都不会发生任何事情。已订阅。因此,仅当Mono
时由 myNonBlockingHttpCallWithMono
返回方法,将被订阅,逻辑提供给Mono.create(Consumer)
将被执行。
我们还可以走得更远。我们可以让我们的执行更加懒惰。您可能知道,Mono
延伸Publisher
来自 react 流规范。 Reactive Streams 的一大特色是背压支持。因此,使用 Mono
仅当确实需要数据并且我们的订阅者准备好使用它们时,我们才能执行 API:
Mono.create(sink -> {
AtomicBoolean once = new AtomicBoolean();
sink.onRequest(__ -> {
if(!once.get() && once.compareAndSet(false, true) {
myAsyncHttpClient.execute(someData, (result, exception -> {
if(exception != null) {
sink.error(exception);
return;
}
sink.success(result);
});
}
});
});
在此示例中,我们仅在订阅者调用 Subscription#request
时才执行数据因此,它通过这样做来声明已准备好接收数据。
摘要
-
CompletableFuture
是异步的并且可以是非阻塞的 -
CompletableFuture
很渴望。你不能推迟执行。但您可以取消它们(这比没有好) -
Mono
是异步/非阻塞的,可以轻松执行不同的任何调用Thread
通过编写主要Mono
与不同的运营商。 -
Mono
确实是懒惰的,并且允许通过订阅者的存在及其消费数据的准备情况来推迟执行启动。
关于java - Mono 与 CompletableFuture,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54866391/