我正在尝试用 Java 通过 WebSocket 实现一个简单的类似 RPC(或请求-响应)系统(前端会有 JS,但我现在正在开发后端)。
我正在尝试应用Java CompletableFuture
模式来处理异步发送消息。但我目前陷入错误处理。
我有一个类(我们称之为 rpc 类),负责通过 WebSocket session 发送消息(此处使用 Spring WebSocket 支持类),然后等待“回复”类型消息,并将它们与待处理的请求并将内容返回给调用者。
流程是:
- 客户端代码调用 rpc 类上的方法,指定要在远程进程上调用的过程的名称、要向其发送消息的 session 以及要发送的参数映射。
- rpc 类使用另一个较低级别的类使用
Executor
异步发送消息。 (线程池),并收到CompletableFuture<Void>
对于“发送消息”操作 - 它将待处理的请求存储在映射中,构建
CompletableFuture<Map<String, Object>>
并将其与待处理的请求关联起来,并将它们存储在映射中。它返回完整的 future 。 - 当收到“回复”类型的消息时,会在同一类上调用一个方法,该方法会尝试将响应与待处理的请求之一进行匹配(它们有一个 ID),然后完成
CompletableFuture
以及响应中收到的内容。
因此涉及 3 个线程:调用者线程、发送消息的线程以及接收消息并完成 future 的线程。
现在,我应该如何处理消息发送过程中的错误(例如IO错误),以便返回completableFuture
也失败(或者可能实现重试策略,并超时......)?
下面是发送消息的rpc类方法的代码:
/**
* Invoke a remote procedure over WS on the specified session, with the given arguments.
* @param session The target session on which to send the RPC message
* @param target The name of the procedure to call
* @param arguments The arguments to be sent in the message
* @return
*/
public CompletableFuture<Map<String,Object>> invoke(WebSocketSession session, String target, Map<String, Object> arguments){
Invocation invocationMessage = new Invocation(target, arguments);
invocationMessage.setId(getNextId());
// completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation
CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>();
CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage);
// handle problem in the sending of the message
senderFuture.exceptionally(e -> {
// is this correct ??
invocationFuture.completeExceptionally(e);
return null;
});
// store the pending invocation in the registry
registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture));
// return the future so the caller can have access to the result once it is ready
return invocationFuture;
}
最佳答案
最简单的方法是使用 thencompose()
简单地链接 future:
// completeable future for the result. It does nothing, will be completed when reply is received which happen in a different thread, see completeInvocation
CompletableFuture<Map<String, Object>> invocationFuture = new CompletableFuture<>();
CompletableFuture<Void> senderFuture = sender.sendMessage(session, invocationMessage);
// store the pending invocation in the registry
registry.addPendingInvocation(new PendingInvocation(invocationMessage, session, invocationFuture));
// return the future so the caller can have access to the result once it is ready
return senderFuture.thenCompose(__ -> invocationFuture);
如果 senderFuture
异常完成,返回的 future 也将异常完成,并且 CompletionException
将该异常作为其原因(参见 CompletionStage
api ) .
请注意,您可能还想解决其他问题:
- 如果出现异常,您是否也应该取消待处理的调用?
- 如果响应在调用
addPendingInitation
之前到达,会发生什么情况?您是否应该在调用sendMessage
之前调用它以避免出现问题? - 既然您没有对
invocableFuture
执行任何操作,那么在addPenidngInvocau
中创建它不是更好吗?
关于java - 如何处理 Java 8 CompletableFuture 中的错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39513431/