我需要创建一个 REST 端点,它将“同步”由 JMS 工作的后端服务的请求和响应。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时时间内没有响应,则错误将返回给消费者。对于消费者来说,这个端点应该看起来像一个正常的同步请求/响应。
目前我已经使用 java.util.concurrent.Exchanger 实现了它。 我的代码(简化):
REST 端点:
@RestController
public class Endpoint {
private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();
@GetMapping("/data/{requestId}")
public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
// wait for JMS response and return it
return waitForResponse(syncExchanger, requestId, timeout);
}
private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
if (exchangers.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
Exchanger<String> exchanger = new Exchanger<>();
exchangers.put(requestId, exchanger);
return exchanger;
}
private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
try {
return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "interrupted";
} catch (TimeoutException e) {
throw new TimeoutException("timeout on waiting JMS response.", e);
} finally {
exchangers.remove(requestId);
}
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
Exchanger<String> exchanger = exchangers.get(requestId );
if (exchanger != null) {
try {
exchanger.exchange(payload);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
exchangers.remove(requestId );
}
}
}
}
该解决方案有效。但它在等待响应时阻塞请求线程。 那么Web服务器线程池在高负载时就会超出限制。
有没有办法以非阻塞的方式做到这一点?
类似这样的事情:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData() {
return CompletableFuture.supplyAsync(() -> {
sendToJMS(requestId);
// How to wait for JMS response with some timeout ?
});
}
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
// How to "complete" CompletableFuture ?
}
最佳答案
Spring 接受 CompletableFuture
作为 Controller 中的返回类型,因此您可以在 createAndPutIfNotExists()
中创建一个返回类型,并在 onMessage()
中完成它.
将您的交易所
映射替换为 future
映射:
private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();
然后调整发送部分:
@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
CompletableFuture<String> future = createAndPutIfNotExists(requestId);
sendToJMS(requestId);
int timeout = 30;
CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
result.thenRun(() -> futures.remove(requestId, future));
return result;
}
private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
if (futures.get(requestId) != null) {
throw new BadHeaderException("Duplicate requestId");
}
CompletableFuture<String> future = new CompletableFuture<>();
futures.put(requestId, future);
return future;
}
请注意,超时处理是使用 Java 9 的 orTimeout()
方法执行的。如果您使用的是 Java 8,则需要 custom timeout hanlding .
您可能还需要执行一些 thenApplyAsync(s -> s, executor)
技巧,将响应提交移出 JMS/超时处理线程。
最后,在收到响应时只需complete()
future:
@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
String requestId = m.getStringProperty("RequestId");
String payload = m.getBody();
CompletableFuture<String> future = futures.get(requestId);
if (future != null) {
try {
future.complete(payload);
} finally {
futures.remove(requestId, future);
}
}
}
关于java - 同步异步后端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61884367/