java - CompletableFuture——聚合 future 以快速失败

标签 java java.util.concurrent completable-future

我一直在使用 CompletableFuture.allOf(...) 助手来创建聚合 future ,只有当它们的复合 future 被标记为完成时,这些 future 才会“完成”,即:

CompletableFuture<?> future1 = new CompletableFuture<>();
CompletableFuture<?> future2 = new CompletableFuture<>();
CompletableFuture<?> future3 = new CompletableFuture<>();

CompletableFuture<?> future = CompletableFuture.allOf(future1, future2, future3);

我希望此功能略有不同,其中当满足以下条件时, future 的总和是市场完整的:

  • 所有 future 均已成功完成
  • 任何一个 future 已经完成失败

在后一种情况下,聚合 Future 应该(异常(exception)地)立即完成,而不必等待其他 Future 完成,即 fail-fast

为了说明这一点与 CompletableFuture.allOf(...) 的对比,请考虑以下内容:

// First future completed, gotta wait for the rest of them...
future1.complete(null);
System.out.println("Future1 Complete, aggregate status: " + future.isDone());

// Second feature was erroneous! I'd like the aggregate to now be completed with failure
future2.completeExceptionally(new Exception());
System.out.println("Future2 Complete, aggregate status: " + future.isDone());

// Finally complete the third future, that will mark the aggregate as done
future3.complete(null);
System.out.println("Future3 Complete, aggregate status: " + future.isDone());

使用 allOf(...),此代码产生:

Future1 Complete, aggregate status: false
Future2 Complete, aggregate status: false
Future3 Complete, aggregate status: true

而我的替代聚合实现将在 Feature2 完成后返回“true”,因为这是一个异常(exception)。


我在 Java 标准库中找不到任何可以帮助我实现此目的的实用程序,这感觉很奇怪……因为它是一个相对普通的用例。

看看 CompletableFuture.allOf(...) 的实现,很明显这些场景背后的逻辑相当复杂。我不愿意自己写这个,我想知道是否有其他选择?

最佳答案

虽然在语法上不如 CompletableFuture.allOf(...) 方法甜美,但 thenCompose(...) 似乎可以提供解决方案:

CompletableFuture<?> future = future1.thenCompose((f) -> future2).thenCompose((f) -> future3);

这将产生所需的:

Future1 Complete, aggregate status: false
Future2 Complete, aggregate status: true
Future3 Complete, aggregate status: true

这可以包含在一个辅助方法中,该方法会为调用者提供一些语法上的细节:

private static CompletableFuture<?> composed(CompletableFuture<?> ... futures) {

    // Complete when ALL the underlying futures are completed
    CompletableFuture<?> allComplete = CompletableFuture.allOf(futures);

    // Complete when ANY of the underlying futures are exceptional
    CompletableFuture<?> anyException = new CompletableFuture<>();
    for (CompletableFuture<?> completableFuture : futures) {
        completableFuture.exceptionally((t) -> {
            anyException.completeExceptionally(t);
            return null;
        });
    }

    // Complete when either of the above are satisfied
    return CompletableFuture.anyOf(allComplete, anyException);
}

允许:

CompletableFuture<?> future = composed(future1, future2, future3);

关于java - CompletableFuture——聚合 future 以快速失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33783561/

相关文章:

java - 如果 completableFuture 失败,如何记录?

asynchronous - CompletableFuture.exceptionally 与 executor

java - 在Junit中的afterclass中添加代码

java - 处理 fragment 中的单选组和按钮

java - Apache HTTP 客户端 : build simulator using multithreaded environment

java - 使用 java.util.concurrent 阻止操作,直到列表变为非空

Java Future - Spring Authentication 在 AuditorAware 中为空

java - 使用 Hibernate 用 transient 对象更新持久对象

java - Spring 集成: Multiple Application Integration using Spring Integration

java - 有ConcurrentHashMap为什么还要synchronizedMap()?