java-8 - 如何在链式 CompletableFuture 中扇出?

标签 java-8 parallel-processing completable-future

我想链接一个 CompletableFuture 以便它在处理过程中散开。我的意思是我有一个针对列表的开放 CompletableFuture,我想对该列表中的每个项目应用计算。

第一步是调用发出异步调用的 m_myApi.getResponse(request, executor)。

该异步调用的结果有一个 getCandidates 方法。我想并行解析所有这些候选人。

目前,我的代码会依次解析它们

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .map(MyParser::ParseCandidates)
                                                   .collect(Collectors.toList()));
}

我想要这样的东西:
public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(@Nonnull final REQUEST request, @Nonnull final Executor executor)
{
        CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
        return candidates.thenApplyAsync(response -> response.getCandidates()
                                                   .stream()
                                                   .PARSE_IN_PARALLEL_USING_EXECUTOR
}

最佳答案

this answer 中所述,如果 Executor碰巧是 Fork/Join 池,有一个(未记录的)功能,即在其工作线程之一中启动并行流将使用该执行程序执行并行操作。

当您想支持任意时Executor实现,事情更复杂。一种解决方案看起来像

public CompletableFuture<List<DOMAIN_OBJECT>> parseAllCandidates(
       @Nonnull final REQUEST request, @Nonnull final Executor executor)
{
    CompletableFuture<RESPONSE> candidates = m_myApi.getResponse(request, executor);
    return candidates.thenComposeAsync(
        response -> {
            List<CompletableFuture<DOMAIN_OBJECT>> list = response.getCandidates()
                .stream()
                .map(CompletableFuture::completedFuture)
                .map(f -> f.thenApplyAsync(MyParser::ParseCandidates, executor))
                .collect(Collectors.toList());
            return CompletableFuture.allOf(list.toArray(new CompletableFuture<?>[0]))
                .thenApplyAsync(x ->
                    list.stream().map(CompletableFuture::join).collect(Collectors.toList()),
                    executor);
        },
        executor);
}

第一个至关重要的事情是,我们必须在开始等待任何作业之前提交所有潜在的异步作业,以启用执行程序可能支持的最大并行度。因此,我们必须收集 List 中的所有 future 。在第一步。

在第二步中,我们可以遍历列表和 join所有 future 。如果 executor 是一个 Fork/Join 池并且 future 尚未完成,它会检测到这一点并启动一个补偿线程以重新获得配置的并行度。但是,对于任意执行器,我们不能假设这样的特性。最值得注意的是,如果执行程序是单线程执行程序,这可能会导致死锁。

因此,该解决方案使用 CompletableFuture.allOf只有当所有 future 都已经完成时,才执行迭代和加入所有 future 的操作。因此,该解决方案永远不会阻塞执行程序的线程,使其与任何 Executor 兼容。执行。

关于java-8 - 如何在链式 CompletableFuture 中扇出?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52998449/

相关文章:

java - ForkJoinPool和异步IO

Java 8 java.util.stream.流

python-3.x - 如何使用Python 3并行下载和解析HTML文件?

C# 和 SQL - 按照示例编写 Parallel.ForEach 循环来创建线程,但它抛出有关连接状态的异常

java CompletableFuture.thenCombine 返回 CompletableFuture 的 CompletableFuture

java - 为什么 CompletableFuture 中的断点也会停止主线程中的执行?

java - 为什么混合GC无法清除内存,而Full DC却可以?

java - 通过谓词查找第一个元素

python - 如何判断 apply_async 函数是否已启动或它是否仍在 multiprocessing.Pool 队列中

java - Java 8 中的 CompletableFuture