java - SpringAMQP 中的 Reply-To 是否已预先设置?

标签 java rabbitmq spring-boot spring-amqp

我正在使用 SpringBoot 启动连接到 RabbitMQ 队列的 SpringAMQP 应用程序。我希望能够从生产者发送消息,指定回复队列,以便消费者只需要发送而不必调查目的地(因此不必在消息本身中传递回复数据)。

这是我的配置(在生产者和消费者之间共享)

private static final String QUEUE_NAME = "testQueue";
private static final String ROUTING_KEY = QUEUE_NAME;
public static final String REPLY_QUEUE = "replyQueue";
private static final String USERNAME = "guest";
private static final String PASSWORD = "guest";
private static final String IP = "localhost";
private static final String VHOST = "/";
private static final int PORT = 5672;

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    amqpAdmin().declareQueue(new Queue(QUEUE_NAME));
    amqpAdmin().declareQueue(new Queue(REPLY_QUEUE));
    return template;
}

@Bean
public AmqpAdmin amqpAdmin() {
    return new RabbitAdmin(connectionFactory());
}

@Bean
public ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(IP);
    connectionFactory.setUsername(USERNAME);
    connectionFactory.setPassword(PASSWORD);
    connectionFactory.setVirtualHost(VHOST);
    connectionFactory.setPort(PORT);
    return connectionFactory;
}

我发送的消息如下:

public Object sendAndReply(String queue, String content){
        return template.convertSendAndReceive(queue, new Data(content), new MessagePostProcessor() {

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setReplyTo(ReplyTester.REPLY_QUEUE);
                return message;
            }
        });
    }

等待回复如下:

public void replyToQueue(String queue){
    template.receiveAndReply(queue, new ReceiveAndReplyCallback<Data, Data>() {
        @Override
        public Data handle(Data payload) {
            System.out.println("Received: "+payload.toString());
            return new Data("This is a reply for: "+payload.toString());
        }
    });
}

但是发送时,我收到以下异常:

Exception in thread "main" org.springframework.amqp.UncategorizedAmqpException: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:66)
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceiveWithTemporary(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSendAndReceive(RabbitTemplate.java:697)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:673)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertSendAndReceive(RabbitTemplate.java:663)
    at prodsend.Prod.sendAndReply(ReplyTester.java:137)
    at prodsend.ReplyTester.sendMessages(ReplyTester.java:49)
    at prodsend.ReplyTester.main(ReplyTester.java:102)
Caused by: java.lang.IllegalArgumentException: Send-and-receive methods can only be used if the Message does not already have a replyTo property.
    at org.springframework.util.Assert.isNull(Assert.java:89)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:711)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$6.doInRabbit(RabbitTemplate.java:705)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835)
    ... 8 more

ReplyTest.137 行指向上面 sendAndReply 方法中的 return 行。

<小时/>

编辑: 这是上面提到的 Data 类:)

class Data{
    public String d;
    public Data(String s){ d = s; }
    public String toString() { return d; }
}

最佳答案

来自documentation :

Basic RPC pattern. Send a message to a default exchange with a specific routing key and attempt to receive a response. Implementations will normally set the reply-to header to an exclusive queue and wait up for some time limited by a timeout.

因此,方法 convertSendAndReceive 处理设置 replyTo header 并返回 Messaage - 响应。这是一种同步模式 - RPC。

如果您想异步执行此操作(您似乎如此),请不要使用此方法。使用适当的 convertAndSend 方法并使用适当的 MessagePostProcessor 添加您的 replyTo header 。

由于这是异步的,您需要注册一个单独的处理程序来接收回复。这需要在将消息发送给另一方之前完成。该处理程序将在发送消息后的某个时刻被调用 - 时间未知。阅读 Spring AQMP Documentation3.5.2 异步消费者部分.

所以,异步流程:

  1. 发送者在 replyTo 队列上注册一个处理程序
  2. 发件人发送带有 replyTo 设置的消息
  3. 客户端调用receiveAndReply,处理消息,并向replyTo发送回复
  4. 触发发送者回调方法

同步流程为:

  1. 发送者使用 sendAndReceive 和 block 发送消息
  2. 客户端调用receiveAndReply,处理消息,并向replyTo发送回复
  3. 发送者收到回复,唤醒并处理它

所以后一种情况需要发送者等待。由于您使用的是 receiveXXX 而不是注册异步处理程序,因此如果客户端需要一段时间才能调用 receiveXXX,则发送方可能会等待很长时间。

顺便说一句,如果您想使用同步方法但使用特定的replyTo,您始终可以调用setReplyQueue。对于我提到的客户端懒得阅读消息或忘记回复的情况,还有一个 setReplyTimeout

关于java - SpringAMQP 中的 Reply-To 是否已预先设置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25782531/

相关文章:

java - 在 Scala 中为 Java 对象创建隐式 View 的最佳方法

javascript - JXBrowser JSFunctionCallback 和 IFrame

java - 在Spring CachingConnectionFactory中设置connectionTimeouts和socketTimeouts

spring - 使用 RabbitTemplate 或 AmqpTemplate 哪一个?

java - Spring Boot 集成测试 - 不需要的模拟

java - Intellij 12.1 中的自定义 setter

属性文件的java类路径问题

mongodb - 具有 "get or block"操作的数据存储?

spring - 加载类 "org.slf4j.impl.StaticLoggerBinder"失败,Spring Boot

java - @GetMapping 和 RSocketServer 与 spring-boot-starter-rsocket