java - 同步异步后端

标签 java synchronization jms nonblocking completable-future

我需要创建一个 REST 端点,它将“同步”由 JMS 工作的后端服务的请求和响应。换句话说,我的端点应该向 JMS 输入队列发送消息,在 JMS 输出队列中等待响应。如果在超时时间内没有响应,则错误将返回给消费者。对于消费者来说,这个端点应该看起来像一个正常的同步请求/响应。

目前我已经使用 java.util.concurrent.Exchanger 实现了它。 我的代码(简化):

REST 端点:

@RestController
public class Endpoint {

   private ConcurrentMap<String, Exchanger> exchangers = new ConcurrentHashMap<>();

   @GetMapping("/data/{requestId}")
   public ResponseEntity<String> getData(@Parameter(in = ParameterIn.PATH, required = true) String requestId) {
      Exchanger<String> syncExchanger = createAndPutIfNotExists(requestId);
      sendToJMS(requestId);
      int timeout = 30;
      // wait for JMS response and return it
      return waitForResponse(syncExchanger, requestId, timeout);
   }

   private synchronized Exchanger<String> createAndPutIfNotExists(String requestId) {
        if (exchangers.get(requestId) != null) {
            throw new BadHeaderException("Duplicate requestId");
        }
        Exchanger<String> exchanger = new Exchanger<>();
        exchangers.put(requestId, exchanger);
        return exchanger;
   }

   private String waitForResponse(Exchanger<String> exchanger, String requestId, int timeout) {
        try {
            return exchanger.exchange(null, timeout, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return "interrupted";
        } catch (TimeoutException e) {
            throw new TimeoutException("timeout on waiting JMS response.", e);
        } finally {
            exchangers.remove(requestId);
        }
   }

   @JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();
      Exchanger<String> exchanger = exchangers.get(requestId );

      if (exchanger != null) {
            try {
                exchanger.exchange(payload);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                exchangers.remove(requestId );
            }
      }
   }
}

该解决方案有效。但它在等待响应时阻塞请求线程。 那么Web服务器线程池在高负载时就会超出限制。

有没有办法以非阻塞的方式做到这一点?

类似这样的事情:

@GetMapping("/data/{requestId}")
   public CompletableFuture<String> getData() {
      return CompletableFuture.supplyAsync(() -> {
        sendToJMS(requestId);

        // How to wait for JMS response with some timeout ?

      });
   }

@JmsListener(destination = "${jms.outputTopic}")
   public void onMessage(Message m) throws JMSException {
      String requestId = m.getStringProperty("RequestId");
      String payload = m.getBody();

      // How to "complete" CompletableFuture ?

   }

最佳答案

Spring 接受 CompletableFuture 作为 Controller 中的返回类型,因此您可以在 createAndPutIfNotExists() 中创建一个返回类型,并在 onMessage() 中完成它.

将您的交易所映射替换为 future 映射:

private ConcurrentMap<String, CompletableFuture<String>> futures = new ConcurrentHashMap<>();

然后调整发送部分:

@GetMapping("/data/{requestId}")
public CompletableFuture<String> getData(@PathParam("requestId") String requestId) {
    CompletableFuture<String> future = createAndPutIfNotExists(requestId);
    sendToJMS(requestId);
    int timeout = 30;
    CompletableFuture<String> result = future.orTimeout(timeout, TimeUnit.SECONDS);
    result.thenRun(() -> futures.remove(requestId, future));
    return result;
}

private synchronized CompletableFuture<String> createAndPutIfNotExists(String requestId) {
    if (futures.get(requestId) != null) {
        throw new BadHeaderException("Duplicate requestId");
    }
    CompletableFuture<String> future = new CompletableFuture<>();
    futures.put(requestId, future);
    return future;
}

请注意,超时处理是使用 Java 9 的 orTimeout() 方法执行的。如果您使用的是 Java 8,则需要 custom timeout hanlding .

您可能还需要执行一些 thenApplyAsync(s -> s, executor) 技巧,将响应提交移出 JMS/超时处理线程。

最后,在收到响应时只需complete() future:

@JmsListener(destination = "${jms.outputTopic}")
public void onMessage(Message m) throws JMSException {
    String requestId = m.getStringProperty("RequestId");
    String payload = m.getBody();
    CompletableFuture<String> future = futures.get(requestId);

    if (future != null) {
        try {
            future.complete(payload);
        } finally {
            futures.remove(requestId, future);
        }
    }
}

关于java - 同步异步后端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61884367/

相关文章:

java - 如果需要更多时间,则终止 threadexecutor 中的任务

java - 具有 Closeable 参数的方法的单元测试

Java Opencv Mat构造函数不匹配

java - 如何在多个子报表上使用相同的 JRBeanCollectionDataSource?

java - Eclipse RCP SWT 弹出窗口问题

java - new MQConnectionFactory() 在 JBoss 中抛出 NullPointerException

javascript - 带循环的 NodeJS 异步控制流

java同步机制

c++ - 为什么队列的所有元素都一样,元素在posix线程中加入队列

java - JMS Websphere MQ BytesMessge 和 TextMessage