java - 如何处理 Java 8 CompletableFuture 中的错误?

标签 java multithreading concurrency java-8 completable-future

我正在尝试用 Java 通过 WebSocket 实现一个简单的类似 RPC(或请求-响应)系统(前端会有 JS,但我现在正在开发后端)。

我正在尝试应用Java CompletableFuture模式来处理异步发送消息。但我目前陷入错误处理。

我有一个类(我们称之为 rpc 类),负责通过 WebSocket session 发送消息(此处使用 Spring WebSocket 支持类),然后等待“回复”类型消息,并将它们与待处理的请求并将内容返回给调用者。

流程是:

  • 客户端代码调用 rpc 类上的方法,指定要在远程进程上调用的过程的名称、要向其发送消息的 session 以及要发送的参数映射。
  • rpc 类使用另一个较低级别的类使用 Executor 异步发送消息。 (线程池),并收到 CompletableFuture<Void>对于“发送消息”操作
  • 它将待处理的请求存储在映射中,构建 CompletableFuture<Map<String, Object>>并将其与待处理的请求关联起来,并将它们存储在映射中。它返回完整的 future 。
  • 当收到“回复”类型的消息时,会在同一类上调用一个方法,该方法会尝试将响应与待处理的请求之一进行匹配(它们有一个 ID),然后完成 CompletableFuture以及响应中收到的内容。

因此涉及 3 个线程:调用者线程、发送消息的线程以及接收消息并完成 future 的线程。

现在,我应该如何处理消息发送过程中的错误(例如IO错误),以便返回completableFuture也失败(或者可能实现重试策略,并超时......)?

下面是发送消息的rpc类方法的代码:

/**
 * Invoke a remote procedure over WS on the specified session, with the given arguments.
 * @param session The target session on which to send the RPC message
 * @param target The name of the procedure to call
 * @param arguments The arguments to be sent in the message
 * @return
 */
public CompletableFuture<Map<String,Object>> invoke(WebSocketSession session, String target, Map<String, Object> arguments){
    Invocation invocationMessage = new Invocation(target, arguments);
    invocationMessage.setId(getNextId());

    // completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation
    CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>();

    CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage);

    // handle problem in the sending of the message
    senderFuture.exceptionally(e -> {
        // is this correct ??
        invocationFuture.completeExceptionally(e);
        return null;
    });

    // store the pending invocation in the registry
    registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture));

    // return the future so the caller can have access to the result once it is ready
    return invocationFuture;
}

最佳答案

最简单的方法是使用 thencompose() 简单地链接 future:

// completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation
CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>();

CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage);

// store the pending invocation in the registry
registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture));

// return the future so the caller can have access to the result once it is ready
return senderFuture.thenCompose(__ -> invocationFuture);

如果 senderFuture 异常完成,返回的 future 也将异常完成,并且 CompletionException 将该异常作为其原因(参见 CompletionStage api ) .

请注意,您可能还想解决其他问题:

  • 如果出现异常,您是否也应该取消待处理的调用?
  • 如果响应在调用 addPendingInitation 之前到达,会发生什么情况?您是否应该在调用 sendMessage 之前调用它以避免出现问题?
  • 既然您没有对 invocableFuture 执行任何操作,那么在 addPenidngInvocau 中创建它不是更好吗?

关于java - 如何处理 Java 8 CompletableFuture 中的错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39513431/

相关文章:

java - 是否可以覆盖任何 Java 小程序的最大堆大小的默认设置?

java - 为什么我的 TCP 服务器套接字在一个客户端失去连接后关闭?

java - ExecutorService性能问题

java - 为什么 Java 中的 XML Dom 会报告额外的节点?

java - 使用 UTF-8 编码的 Tomcat 7

java - 在 javac 中强制执行包声明检查

java - 使用 tcp/ip 套接字的客户端-服务器中的竞争或死锁

c++ - 在代码的其他部分使用带有 lock_gard 的同一个互斥体而不使用它是否安全

sql - Postgres Serialized 似乎不像描述的那样工作

ios - 在 Core Data 中同时保存上下文在 iOS7 中不起作用