java - RabbitMQ 队列和路由键

标签 java spring rabbitmq queue

在文档中 https://docs.spring.io/spring-amqp/reference/htmlsingle/ 我明白了

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(Order order) {

  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(Invoice invoice) {

  }

这里有1个queue和2个routing key,大家为他的方法 但是我的代码没有从 key 中获取消息!

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "order")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout order");
    }

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = DRIVER_QUEUE, durable = "true"),
            exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
            key = "invoice")
    )
    public String getOrders(byte[] message) throws InterruptedException {
         System.out.println("Rout invoice");
    }

他们都从队列中获取消息,但看不到 key ... 站点发送带有键“invoice”的队列消息,我在控制台“Route order”中看到 什么问题??非常感谢!

rabbitmq 3.7.3 Spring 4.2.9 org.springframework.amqp 1.7.5

最佳答案

错误是您将所有消息发送到同一个队列。

您应该为每个监听器使用不同的队列。您的绑定(bind)只是告诉带有 RK="invoice"和 RK="order"的消息必须进入同一个队列,而不是告诉监听器使用该 RK 处理队列元素。

你应该绑定(bind)例如通过键“invoice”交换到 DRIVER_QUEUE1(例如“queue-orders”)并通过键“order”交换到 DRIVER_QUEUE2(例如“queue-invoices”)。通过这种方式,您可以分离消息,并且可以放置两个监听器,一个用于发票,一个用于订单。例如。像这样:

@RabbitListener(queues = "queue-orders")
public void handleOrders(@Payload Order in,
      @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
   logger.info("Key: {}, msg: {}",key,in.toString());
}


@RabbitListener(queues = "queue-invoices")
public void handleInvoices(@Payload Invoice in, 
      @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String key) {
   logger.info("Key: {}, msg: {}",key,in.toString());
}

我不喜欢完整的注释,因为当代理配置完成时,恕我直言,完整注释变得无用(或者更好,添加对我无用的额外检查)。但如果你愿意,整个注释应该像

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "queue-orders", durable = "true"),
        exchange = @Exchange(value = "exchange", ignoreDeclarationExceptions = "true", autoDelete = "true"),
        key = "invoice")
)

然后你可以通过 convertAndSend(exchangename, routingkey, object) 发送消息,就像在

Order order = new Order(...);
rabbitTemplate.convertAndSend("exchange", "order", order);
Invoice invoice = new Invoice(...);
rabbitTemplate.convertAndSent("exchange", "invoice", invoice);

如果您的启动应用程序实现了 RabbitListenerConfigurer,那么您可以像这样配置一切

@SpringBootApplication
public class MyApplication implements RabbitListenerConfigurer {
   // other config stuff here....

    @Bean("queue1")
    public Queue queue1() {
        return new Queue("queue-orders", true);
    }

    @Bean("queue2")
    public Queue queue2() {
        return new Queue("queue-invoices", true);
    }

    @Bean
    public Binding binding1(@Qualifier("queue1") Queue queue, TopicExchange exchange) {        
        return BindingBuilder.bind(queue).to(exchange).with("invoice");
    }

    @Bean
    public Binding binding2(@Qualifier("queue2") Queue queue, TopicExchange exchange) {        
        return BindingBuilder.bind(queue).to(exchange).with("order");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }

    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    // Exchange.
    @Bean
    public TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
}

希望得到答复。

关于java - RabbitMQ 队列和路由键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50234800/

相关文章:

java - 考虑某个字段,从列表中删除对象重复项

spring - 在运行时配置 Spring HTTP Security

python - 尝试连接到 RabbitMQ 时出现 IncompatibleProtocolError

django - RabbitMQ/Celery/Django 内存泄漏?

scala - Prismarabbitmq 在 kubernetes 上的部署不起作用

java - eclipse : Import maven project into current workspace location

java - 如何使用 Mockito 模拟文件静态方法

java - 正则表达式去除所有方括号,除了特定前缀之后的方括号

java - 如何保证一个bean只被Spring实例化一次

java - 不满意的依赖异常 : Error creating bean with name "empController"