java - Mono.toFuture() 是阻塞的吗?

标签 java reactive-programming apache-commons-httpclient reactor

来自Official Documentation of Mono#block()据说:

Subscribe to this Mono and block indefinitely until a next signal is received. Returns that value, or null if the Mono completes empty. In case the Mono errors, the original exception is thrown (wrapped in a RuntimeException if it was a checked exception).

因此可以肯定 block() 方法是阻塞的,并且在 block() 解析之前它不会执行下一行。

但我的困惑是,当我使用 toFuture() 时,我期望它是非阻塞的,但它的行为与 block 方法完全相同。并在 Documentation of Mono#toFuture()据称:

Transform this Mono into a CompletableFuture completing on onNext or onComplete and failing on onError.

Mono#toFuture()

不太清楚。本文档中没有任何地方说 Mono#toFuture() 正在阻塞

  1. 请确认 toFuture() 方法是阻塞还是非阻塞?
  2. 另外,如果是非阻塞,那么哪个线程将负责执行CompletableFuture内的代码?

更新:添加代码片段

使用Mono.block()方法:

    long time = System.currentTimeMillis();
    String block = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).block();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));

    return CompletableFuture.completedFuture(block);
<小时/>

使用Mono.ToFuture()方法:

    long time = System.currentTimeMillis();
    CompletableFuture<String> toFuture = Mono.fromCallable(() -> {
        logger.debug("inside in fromCallable() block()");
        //Upstream httpcall with apache httpClient().
        // which takes atleast 1sec to complete.
        return "Http response as string";
    }).toFuture();
    logger.info("total time needed {}", (System.currentTimeMillis()-time));
    return toFuture;

这两个代码片段的行为完全相同。

最佳答案

--编辑:我错了。 mono.toFuture() 不会阻塞 --

mono.toFuture() 不会阻塞。看看这个测试:

    @Test
    void testMonoToFuture() throws ExecutionException, InterruptedException {
        System.out.println(LocalTime.now() + ": start");
        Mono<String> mono = Mono.just("hello StackOverflow")
            .delayElement(Duration.ofMillis(500))
            .doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
        Future<String> future = mono.toFuture();
        System.out.println(LocalTime.now() + ": future created");
        String result = future.get();
        System.out.println(LocalTime.now() + ": future completed");
        assertThat(result).isEqualTo("hello StackOverflow");
    }

这是结果:

20:18:49.557: start
20:18:49.575: future created
20:18:50.088: mono completed
20:18:50.088: future completed

future 几乎立刻就被创造出来了。半秒后,单声道完成,紧接着, future 完成。这正是我所期望发生的事情。

那么为什么单声道在问题中提供的示例中看起来会阻塞?这是因为 mono.fromCallable() 的工作方式。该可调用实际运行的时间和地点? mono.fromCallable() 不会产生额外的线程来完成这项工作。从我的测试来看,当您第一次在单声道上调用 subscribe() 或 block() 或类似的东西时,可调用对象似乎会运行,并且它将在执行该操作的线程中运行。

这是一个测试,表明如果您使用 fromCallable() 创建一个 mono,subscribe 将导致可调用对象在主线程中执行,甚至 subscribe() 方法也会显得阻塞。

    @Test
    void testMonoToFuture() throws ExecutionException, InterruptedException {
        System.out.println(LocalTime.now() + ": start");
        System.out.println("main thread: " + Thread.currentThread().getName());
        Mono<String> mono = Mono.fromCallable(() -> {
                System.out.println("callabel running in thread: " + Thread.currentThread().getName());
            Thread.sleep(1000);
            return "Hello StackOverflow";
            })
            .doOnNext((s) -> System.out.println(LocalTime.now() + ": mono completed"));
        System.out.println("before subscribe");
        mono.subscribe(System.out::println);
        System.out.println(LocalTime.now() + ": after subscribe");
    }

结果:

20:53:37.071: start
main thread: main
before subscribe
callabel running in thread: main
20:53:38.099: mono completed
Hello StackOverflow
20:53:38.100: after subscribe

结论:mono.toFuture() 并不比 mono.subscribe() 更具阻塞性。如果您想异步执行某些代码,则不应使用 Mono.fromCallable()。您可以考虑使用 Executors.newSingleThreadExecutor().submit(someCallable)

作为引用,这是我最初的(错误的)答案,我贬低了 mono.block() 方法,该方法肯定是由比我更了解 Java 和编码的人编写的。我想这是关于谦逊的个人教训。

下面的一切都是废话

我想验证它到底是如何工作的,所以我编写了一些测试。不幸的是,事实证明 mono.toFuture() 确实是阻塞的,并且结果是同步计算的。老实说,我不知道你为什么会使用这个功能。 Future 的全部意义在于保存异步计算的结果。

这是我的测试:

@Test
  void testMonoToFuture() throws ExecutionException, InterruptedException {
    Mono<Integer> mono = Mono.fromCallable(() -> {
      System.out.println("start mono");
      Thread.sleep(1000);
      System.out.println("mono completed");
      return 0;
    });
    Future<Integer> future = mono.toFuture();
    System.out.println("future created");
    future.get();
    System.out.println("future completed");
  }

结果:

start mono
mono completed
future created
future completed

这是 monoToFuture() 的实现,其工作方式与我期望的方式相同:

@Test
  void testMonoToFuture() throws ExecutionException, InterruptedException {
    Mono<Integer> mono = Mono.fromCallable(() -> {
      System.out.println("start mono");
      Thread.sleep(1000);
      System.out.println("mono completed");
      return 0;
    });
    Future<Integer> future = monoToFuture(mono, Executors.newSingleThreadExecutor());
    System.out.println("future created");
    future.get();
    System.out.println("future completed");
  }

  private <T> Future<T> monoToFuture(Mono<T> mono, ExecutorService executorService){
    return executorService.submit((Callable<T>) mono::block);
  }

结果:

future created
start mono
mono completed
future completed

关于java - Mono.toFuture() 是阻塞的吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58504527/

相关文章:

java - RCP - 应用程序没有 Activity 窗口

c# - 等待 IObservable 获取所有元素错误

system.reactive - 是否有 Rx 运算符用于仅在流 2 发出事物时组合来自流 1 和 2 的最新数据?

java - 如何在 Commons HttpClient 3.x 中获取当前传输速率

java - 使用有效的客户端证书时出现 HttpClient 403 错误

java - GWT 设计器不起作用

java - 在服务类中使用 MediaPlayer...?

java - 我应该如何实现 SSL?

ios - 如何从 ReactiveCocoa 信号中获取 "old value"?

java - commons http 客户端 - 协商时的 kerberos token 有\r\n(回车换行)字符