如果我正在异步监听 Spring AMQP 消息,我该如何使用发送方提供的 ReplyTo 队列和关联 ID 进行响应?
@Override
public void onMessage(Message message) {
byte[] bytes = message.getBody();
String body = new String (bytes);
logger.info(application + " processing message: \n" + body);
//some business logic
//now I want to respond to the replyto queue with the correlation ID
//rabbitTemplate.????
}
最佳答案
根据有用的反馈,这里是使用 Spring Messaging 的等效解决方案:
public void onMessage(Message message) {
//mock response body
String body = "{ \"processing\": \"123456789\"}";
//mock response properties
MessageProperties properties = new MessageProperties();
properties.setContentType(MediaType.APPLICATION_JSON.toString());
properties.setContentEncoding(StandardCharsets.UTF_8.name());
//return the Correlation ID if present
if (message.getMessageProperties().getCorrelationId() != null) {
properties.setCorrelationId(message.getMessageProperties().getCorrelationId());
}
//create and return the response message
Message responseMessage = new Message(body.getBytes(), properties);
rabbitTemplate.send(message.getMessageProperties().getReplyTo(), responseMessage);
logger.info("Processed and returned message");
对于刚接触 AMQP 的人来说,令人困惑的部分是知道回复消息属性是回复时的“路由键”值(send 或 convertAndSend 方法)。并且临时队列是在默认交换中创建的。对于需要相关 ID 的永久回复队列(临时回复队列不需要),情况并非总是如此。
关于Spring AMQP 响应来自 MessageListener 的 ReplyTo 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35179796/