java - 如何将错误消息移动到rabbitmq死信队列

标签 java rabbitmq spring-amqp spring-rabbit

我阅读了很多文档/stackoverflow,但在将消息移动到死信队列时发生异常时,我仍然遇到问题。我正在使用 spring-boot 这是我的配置:

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Bean
    RetryOperationsInterceptor interceptor() {
        RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(rabbitTemplate, "error_exchange ", "error_key");
        return RetryInterceptorBuilder
            .stateless()
            .recoverer(recoverer)
            .build();
    }

死信队列:

Features    
x-dead-letter-routing-key:  error_key
x-dead-letter-exchange: error_exchange
durable:    true
Policy  DLX

队列名称:错误

我的交流: 名称:error_exchange 绑定(bind):到:错误,routing_key:error_key

这是我的消费者:

@RabbitListener(queues = "${rss_reader_chat_queue}")
    public void consumeMessage(Message message) {
        try {
            List<ChatMessage> chatMessages = messageTransformer.transformMessage(message);
            List<ChatMessage> save = chatMessageRepository.save(chatMessages);
            sendMessagesToChat(save);
        }
        catch(Exception ex) {
            throw new AmqpRejectAndDontRequeueException(ex);
        }
    }

因此,当我发送无效消息并发生某些异常时,它会发生一次(这很好,因为之前消息被一遍又一遍地发送)但消息不会进入我的死信队列。你能帮我解决这个问题吗?

最佳答案

您需要显示您的其余配置 - 启动属性、队列 @Bean 等。您似乎也对使用重新发布恢复器与死信队列之间存在一些混淆;它们是实现相似结果的不同方式。您通常不会同时使用两者。

这是一个简单的启动应用程序,演示了如何使用 DLX/DLQ...

@SpringBootApplication
public class So43694619Application implements CommandLineRunner {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(So43694619Application.class, args);
        context.close();
    }

    @Autowired
    RabbitTemplate template;

    @Autowired
    AmqpAdmin admin;

    private final CountDownLatch latch = new CountDownLatch(1);

    @Override
    public void run(String... arg0) throws Exception {
        this.template.convertAndSend("so43694619main", "foo");
        this.latch.await(10, TimeUnit.SECONDS);
        this.admin.deleteExchange("so43694619dlx");
        this.admin.deleteQueue("so43694619main");
        this.admin.deleteQueue("so43694619dlx");
    }


    @Bean
    public Queue main() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange", "so43694619dlx");
        args.put("x-dead-letter-routing-key", "so43694619dlxRK");
        return new Queue("so43694619main", true, false, false, args);
    }

    @Bean
    public Queue dlq() {
        return new Queue("so43694619dlq");
    }

    @Bean
    public DirectExchange dlx() {
        return new DirectExchange("so43694619dlx");
    }

    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlq()).to(dlx()).with("so43694619dlxRK");
    }

    @RabbitListener(queues = "so43694619main")
    public void listenMain(String in) {
        throw new AmqpRejectAndDontRequeueException("failed");
    }

    @RabbitListener(queues = "so43694619dlq")
    public void listenDlq(String in) {
        System.out.println("ReceivedFromDLQ: " + in);
        this.latch.countDown();
    }

}

结果:

ReceivedFromDLQ: foo

关于java - 如何将错误消息移动到rabbitmq死信队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43694619/

相关文章:

java - 如何在 while 循环中增加行计数器变量?

go - 永久 channel 终止程序并与 RabbitMQ 消费者一起进入 channel

php - 异常Eventlistener无法捕获RabbitMQ使用者的异常

java - 兔子预取

java - Spring AMQP RabbitMQ 实现优先级队列

java - spring amqp rabbitmq MessageListener 不工作

java - 无法绑定(bind)属性

java - 如何从方法返回接口(interface)

java - 使用 "with"关键字在 scala 中向上转换外部 java 库会产生类型不匹配错误

rabbitmq - 如何解析rabbitmq状态输出?