java - 如何将 Project Reactor 的调度程序与基于 Executor 的库一起使用?

标签 java apache-kafka amazon-dynamodb project-reactor aws-sdk-java-2.0

Project Reactor 提供了一种很好的方式来定义代码在哪个线程池上运行,通过定义 Scheduler .它还为使用 CompletableFuture 的库提供了一座桥梁。的虽然Mono.fromFuture(..) .

AWS 的 async client for DyanmoDB , 执行 CompletableFuture是从对 java.util.concurrent.Executor 的 API 调用返回的吗? .默认情况下,它会创建一个 Executor由它也创建的线程池支持。结果是即使流定义为 Scheduler喜欢 Mono.fromFuture(..).subscribeOn(Schedulers.boundedElastic())在库创建的池中的线程上执行,而不是在 Schedulers.boundedElastic() 中的线程上执行。 .所以我们看到像 sdk-async-response-0-2 这样的线程名称, 而不是像 boundedElastic-1 这样的名称.

幸运的是,该库允许我们提供自己的 Executor作为 shown here ,所以我的问题是:

How do you build an Executor that uses a thread from the Scheduler defined on that part of the stream at runtime?



用例

我们有一个存储库类,它有一个 findById方法,我们需要调用者能够控制哪个 Scheduler继续运行,因为它在这些截然不同的上下文中使用:
  • Schedulers.boundedElastic() 上运行的 API 响应调度器。
  • 处理从定义的调度程序按顺序在每个分区的线程上执行的 Kafka 消息,如 Reactor Kafka docs 所示。 .

  • 尝试

    我们尝试定义 Executor使用Schedulers.immediate()Runnable::run如此处所示,但两者都导致在 Netty 事件循环线程上执行(示例名称:aws-java-sdk-NettyEventLoop-0-2),而不是定义的 Scheduler 中的线程.
    DynamoDbAsyncClient.builder()
        .asyncConfiguration(builder -> builder.advancedOption(
            SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
            runnable -> Schedulers.immediate().schedule(runnable)
        ))
        .build();
    
    DynamoDbAsyncClient.builder()
        .asyncConfiguration(builder -> builder.advancedOption(
            SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
            Runnable::run
        ))
        .build();
    

    最佳答案

    第 1 部分。观察与订阅

    调查这个问题,我认为需要 观察 在特定线程上执行后的元素。
    准确地说,观察 在这种情况下,意味着*能够在某个特定线程上处理流中的值。在 RxJava 中,我们有一个正确的操作符,就像这样,但在 Project Reactor 中,我们将相同的操作称为 publishOn .

    因此,
    * 如果要处理数据 * Schedulers.boundedElastic()那么你应该使用以下结构

    Mono.fromFuture(..)
        .publishOn(Schedulers.boundedElastic())
    

    但是等等,.subscribeOn也有效???

    阅读前面的结构,您可能会开始担心,因为您 100% 确定
    Mono.fromRunnable(..)
        .subscribeOn(Schedulers.boundedElastic())
    

    发送 onNext在线程 boundedElastic-1 ,那么相同的fromFuture有什么问题? .

    这里有一个技巧:

    切勿使用 subscribeOnFutures/CompletableFuture或任何可以在下面使用自己的异步机制的东西

    如果我们看看 subscribeOn 背后发生了什么,您会发现类似以下内容:
    //  Simplified version of SubscribeOn operator
    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        Scheduler scheduler;
        Publisher<T> parent;
        scheduler.schedule(() -> parent.subscribe(actual));
    }
    

    这基本上意味着 parent 的subscribe方法将在单独的线程上调用。

    这种技术适用于 fromRunnable , fromSupplier , fromCallable因为他们的逻辑发生在 subscribe方法:
    @Override
    public void subscribe(CoreSubscriber<? super T> actual) {
        Operators.MonoSubscriber<T, T>
        sds = new Operators.MonoSubscriber<>(actual);
    
        actual.onSubscribe(sds);
        // skiped some parts 
        T t = supplier.get();
        if (t == null) {
            sds.onComplete();
        }
        else {
            sds.complete(t);
        }
    }
    

    这意味着它几乎等于
    scheduler.schedule(() -> {
        T t = supplier.get();
        if (t == null) {
            sds.onComplete();
        }
        else {
            sds.complete(t);
        }
    })
    

    相比之下,fromFuture工作更棘手。
    一个简短的测验。

    我们可以在哪个线程上观察到一个值? (假设在 Main 线程上执行,任务在 ForkJoinPool 上执行)
    var future = CompletableFuture
    .supplyAsync(() -> {
      return value;
    })
    ... // some code here, does not metter just code
    
    future.thenAccept(value -> {
      System.out.println(Thread.currentThread())
    });
    

    以及正确的答案......🥁🥁🥁🥁🥁🥁

    可能是主线程
    或者它可能是来自 ForkJoinPool 的 Thread
    ...
    因为它是活泼的......并且在这一点上,我们消费值(value),值(value)可能已经交付,所以我们只是阅读volatile阅读器线程(线程 Main)上的字段,否则,线程 Main 将设置 acceptor所以稍后会在 ForkJoinPool 上调用接受器线。

    对,这就是为什么当你使用 fromFuturesubscribeOn , 不保证 subscribeOn线程将观察给定 CompletableFuture 的值.

    这就是为什么publishOn是确保值处理发生在所需线程上的唯一方法。

    好的,我应该使用 publishOn一直往下???

    是和不是。这取决于。

    如果您使用 Mono - 在 99% 的情况下,您可以使用 publishOn如果您想确保您的数据处理发生在特定线程上 - 请始终使用 publishOn .

    不用担心潜在的开销 , Project Reactor 会照顾你,即使你不小心使用了它。 Project Reactor 有几个优化可以取代你的publishOnsubscribeOn (如果它在不破坏行为的情况下是安全的)在运行时,所以你会得到最好的。

    第 2 部分。跌入 Scheduelr 的兔子洞s

    永远不要使用 Schedulers.immediate()
    它几乎是无操作调度程序,基本上可以
    Schedulers.immediate().scheduler(runnable) {
       runnable.run()
    }
    

    对,它对 react 堆用户没有任何用处,我们仅将其用于内部需求。

    好的,那么我如何使用调度程序在命令式世界中作为执行程序使用它

    有两种选择:

    快速路径:分步指南

    1.a) 创建有界Executor . (例如 Executors.fixed... )
    1.b) 创建有界ScheduledExecutorService如果你想获得周期性任务和延迟任务的力量
    2) 创建一个Scheduler从您的执行人使用 Schedulers.fromExecutorXXX API
    3) 使用您的有界Executor在命令式世界中,使用您的 Scheduler它是 react 世界的有界包装

    漫长的道路

    快来了...

    第 3 部分。如何序列化执行。

    快来了

    关于java - 如何将 Project Reactor 的调度程序与基于 Executor 的库一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60783347/

    相关文章:

    javascript - DynamoDB 的 LocalSecondaryIndexes 出现 "key schema too big"错误?

    amazon-dynamodb - DynamoDB 通过检查属性值长度进行条件更新

    Java:颜色数组不会循环遍历

    java - JButton 无法更改线条的颜色

    javamail : how to have two set of (System. getProperties())

    java - MVC 拦截器 vs Spring 安全过滤器 vs 其他东西......?

    apache-kafka - 融合模式注册表在启动时失败并出现 NoSuchMethodError

    连接到 Amazon AWS DynamoDB NoSQL 的 Android 应用程序

    apache-kafka - -bash : kafka-server-start. sh:找不到命令

    multithreading - Apache kafka消息调度和负载均衡