java - 在 Java 中嵌套 CompletionStages 以使内部 block 在外部 block 之前运行

标签 java multithreading asynchronous java-8 java.util.concurrent

我编写了如下方法:

 public static CompletionStage<Tuple2<ObjectNode, String>> calculateTemplateTreeAndKeys(
  String content,
  RequestContext context,
  MetricsClient metricsClient,
  JdbcSession jdbcSession) {

AtomicReference<ObjectNode> templateTreeHolder = new AtomicReference<>();
templateTreeHolder.set(Json.rootNode());

return getTemplateIds(context, metricsClient, jdbcSession, content)
    .thenCompose(
        templateIds -> {
          templateIds.map(
              id ->
                  // do something and return CompletionStage<String>
                      .thenAccept(
                          tree -> {
                            templateTreeHolder.set(
                                (ObjectNode)
                                    templateTreeHolder.get().set(id, Json.readTree(tree)));

                            System.out.println(
                                "From inner function: " + templateTreeHolder.get());
                          }));
          return CompletableFuture.completedFuture(NotUsed.getInstance());
        })
    .thenApply(
        notUsed -> {
          String includedTemplateIdsStr =
              getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();

          System.out.println("From outer function: " + templateTreeHolder.get());

          return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
        });

我希望内部 block 先处理并更新 templateTreeHolder .thenApply 被调用,以便 templateTreeHolder 将保存正确的数据以返回。但是,.thenApply block 在内部 .thenAccept block 之前进行处理。

从控制台输出序列:

From outer function: {}
From inner function: {"f9406341-c62a-411a-9389-00a62bd63629":{}}

我不确定我在链接 CompletionStages 时做错了什么,请告诉我如何确保内部 block 在外部 block 之前完成?

最佳答案

您的函数传递给 thenCompose返回一个已经完整的 future ,即 return CompletableFuture.completedFuture(NotUsed.getInstance());这允许相关阶段立即进行。这似乎与传递给templateIds.map(…)的函数的求值相冲突。 ,这显然是异步发生的。

通常,您应该避免完成阶段和副作用依赖性的这种混合,特别是当它们的异步评估未建模为先决条件完成阶段时。

但是如果您别无选择,您可以解决此问题:

return getTemplateIds(context, metricsClient, jdbcSession, content)
    .thenCompose(
        templateIds -> {
          // create an initially uncompleted stage
          CompletableFuture<Object> subStage = new CompletableFuture<>(); 
          templateIds.map(
              id ->
                  // do something and return CompletionStage<String>
                      .thenAccept(
                          tree -> {
                            templateTreeHolder.set(
                                (ObjectNode)
                                    templateTreeHolder.get().set(id, Json.readTree(tree)));

                            System.out.println(
                                "From inner function: " + templateTreeHolder.get());
                            // complete when all work has been done
                            subStage.complete(null);
                          }));
          // use this stage for dependent actions
          return subStage;
        })
    .thenApply(
        notUsed -> {
          String includedTemplateIdsStr =
              getKeysFromTemplateTree(templateTreeHolder.get()).toJavaList().toString();

          System.out.println("From outer function: " + templateTreeHolder.get());

          return Tuple.of(templateTreeHolder.get(), includedTemplateIdsStr);
        });

在上面的代码中,如果您的操作在尝试完成之前因异常而失败,则 future 将永远不会完成。一般模式如下:

CompletableFuture<Type> stage = new CompletableFuture<>();
…
try {
    code that will eventually call complete on stage
}
catch(Throwable t) {
    stage.completeExceptionally(t);
}

但是,当然,当应该完成该阶段的代码也承担异步处理时,情况会变得更加复杂,因此您必须保护尝试提交实际完成代码的代码以及实际完成代码.

因此,内部代码的更详细版本将如下所示:

CompletableFuture<Object> subStage = new CompletableFuture<>();
try {
    templateIds.map(
        id ->
            // do something and return CompletionStage<String>
            .thenAccept(
                tree -> {
                  templateTreeHolder.set(
                      (ObjectNode)
                          templateTreeHolder.get().set(id, Json.readTree(tree)));

                  System.out.println(
                      "From inner function: " + templateTreeHolder.get());
                })
            .whenComplete((v,t) -> {
                // complete when all work has been done
                if(t != null) subStage.completeExceptionally(t);
                else subStage.complete(v);
            }));
} catch(Throwable t) {
    subStage.completeExceptionally(t);
}
// use this stage for dependent actions
return subStage;

(也许,“做某事并返回 CompletionStage”也必须用 try { … } catch(Throwable t) { subStage.completeExceptionally(t); } 来保护)

关于java - 在 Java 中嵌套 CompletionStages 以使内部 block 在外部 block 之前运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52887027/

相关文章:

java - 调用异步函数

java - Swing - 典型桌面应用程序中的并发

java - 使用用户输入搜索对象数组,验证输入

java - 从 Java 将 REFCURSOR 类型变量发送到 Oracle 存储过程

ios - 核心数据 3 托管对象上下文

json - Swift 完成处理程序问题

C# async/await 进度报告不符合预期顺序

java - 有没有一种 Spring 方法来检查网页是否仍然存在?

c++ - 尽管有线程,Qt GUI 仍挂起

java - Spring Batch 中的多线程步骤和本地分区有什么区别?