Project Reactor 提供了一种很好的方式来定义代码在哪个线程池上运行,通过定义 Scheduler
.它还为使用 CompletableFuture
的库提供了一座桥梁。的虽然Mono.fromFuture(..)
.
AWS 的 async client for DyanmoDB , 执行 CompletableFuture
是从对 java.util.concurrent.Executor
的 API 调用返回的吗? .默认情况下,它会创建一个 Executor
由它也创建的线程池支持。结果是即使流定义为 Scheduler
喜欢 Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())
在库创建的池中的线程上执行,而不是在 Schedulers.boundedElastic()
中的线程上执行。 .所以我们看到像 sdk-async-response-0-2
这样的线程名称, 而不是像 boundedElastic-1
这样的名称.
幸运的是,该库允许我们提供自己的 Executor
作为 shown here ,所以我的问题是:
How do you build an
Executor
that uses a thread from theScheduler
defined on that part of the stream at runtime?
用例
我们有一个存储库类,它有一个
findById
方法,我们需要调用者能够控制哪个 Scheduler
继续运行,因为它在这些截然不同的上下文中使用:Schedulers.boundedElastic()
上运行的 API 响应调度器。 尝试
我们尝试定义
Executor
使用Schedulers.immediate()
和 Runnable::run
如此处所示,但两者都导致在 Netty 事件循环线程上执行(示例名称:aws-java-sdk-NettyEventLoop-0-2
),而不是定义的 Scheduler
中的线程.DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
runnable -> Schedulers.immediate().schedule(runnable)
))
.build();
DynamoDbAsyncClient.builder()
.asyncConfiguration(builder -> builder.advancedOption(
SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
Runnable::run
))
.build();
最佳答案
第 1 部分。观察与订阅
调查这个问题,我认为需要 观察 在特定线程上执行后的元素。
准确地说,观察 在这种情况下,意味着*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个正确的操作符,就像这样,但在 Project Reactor 中,我们将相同的操作称为 publishOn
.
因此,
* 如果要处理数据 * 在 Schedulers.boundedElastic()
那么你应该使用以下结构
Mono.fromFuture(..)
.publishOn(Schedulers.boundedElastic())
但是等等,
.subscribeOn
也有效???阅读前面的结构,您可能会开始担心,因为您 100% 确定
Mono.fromRunnable(..)
.subscribeOn(Schedulers.boundedElastic())
发送
onNext
在线程 boundedElastic-1
,那么相同的fromFuture
有什么问题? .这里有一个技巧:
切勿使用
subscribeOn
与 Futures
/CompletableFuture
或任何可以在下面使用自己的异步机制的东西如果我们看看
subscribeOn
背后发生了什么,您会发现类似以下内容:// Simplified version of SubscribeOn operator
@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Scheduler scheduler;
Publisher<T> parent;
scheduler.schedule(() -> parent.subscribe(actual));
}
这基本上意味着 parent 的
subscribe
方法将在单独的线程上调用。这种技术适用于
fromRunnable
, fromSupplier
, fromCallable
因为他们的逻辑发生在 subscribe
方法:@Override
public void subscribe(CoreSubscriber<? super T> actual) {
Operators.MonoSubscriber<T, T>
sds = new Operators.MonoSubscriber<>(actual);
actual.onSubscribe(sds);
// skiped some parts
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
}
这意味着它几乎等于
scheduler.schedule(() -> {
T t = supplier.get();
if (t == null) {
sds.onComplete();
}
else {
sds.complete(t);
}
})
相比之下,
fromFuture
工作更棘手。一个简短的测验。
我们可以在哪个线程上观察到一个值? (假设在 Main 线程上执行,任务在 ForkJoinPool 上执行)
var future = CompletableFuture
.supplyAsync(() -> {
return value;
})
... // some code here, does not metter just code
future.thenAccept(value -> {
System.out.println(Thread.currentThread())
});
以及正确的答案......🥁🥁🥁🥁🥁🥁
可能是主线程
或者它可能是来自 ForkJoinPool 的 Thread
...
因为它是活泼的......并且在这一点上,我们消费值(value),值(value)可能已经交付,所以我们只是阅读
volatile
阅读器线程(线程 Main)上的字段,否则,线程 Main 将设置 acceptor
所以稍后会在 ForkJoinPool
上调用接受器线。对,这就是为什么当你使用
fromFuture
与 subscribeOn
, 不保证 subscribeOn
线程将观察给定 CompletableFuture
的值.这就是为什么
publishOn
是确保值处理发生在所需线程上的唯一方法。好的,我应该使用
publishOn
一直往下???是和不是。这取决于。
如果您使用
Mono
- 在 99% 的情况下,您可以使用 publishOn
如果您想确保您的数据处理发生在特定线程上 - 请始终使用 publishOn
.不用担心潜在的开销 , Project Reactor 会照顾你,即使你不小心使用了它。 Project Reactor 有几个优化可以取代你的
publishOn
与 subscribeOn
(如果它在不破坏行为的情况下是安全的)在运行时,所以你会得到最好的。第 2 部分。跌入
Scheduelr
的兔子洞s永远不要使用
Schedulers.immediate()
它几乎是无操作调度程序,基本上可以
Schedulers.immediate().scheduler(runnable) {
runnable.run()
}
对,它对 react 堆用户没有任何用处,我们仅将其用于内部需求。
好的,那么我如何使用调度程序在命令式世界中作为执行程序使用它
有两种选择:
快速路径:分步指南
1.a) 创建有界
Executor
. (例如 Executors.fixed...
)1.b) 创建有界
ScheduledExecutorService
如果你想获得周期性任务和延迟任务的力量2) 创建一个
Scheduler
从您的执行人使用 Schedulers.fromExecutorXXX
API3) 使用您的有界
Executor
在命令式世界中,使用您的 Scheduler
它是 react 世界的有界包装漫长的道路
快来了...
第 3 部分。如何序列化执行。
快来了
关于java - 如何将 Project Reactor 的调度程序与基于 Executor 的库一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60783347/