java - 延迟处理死信队列

标签 java spring-boot rabbitmq spring-cloud-stream dead-letter

我想要执行以下操作:当消息失败并落入我的死信队列时,我想要等待 5 分钟并在我的队列中重新发布相同的消息。

今天,使用 Spring Cloud Streams 和 RabbitMQ,我做了以下代码 Based on this documentation :

@Component
public class HandlerDlq {

    private static final Logger LOGGER = LoggerFactory.getLogger(HandlerDlq.class);
    private static final String X_RETRIES_HEADER = "x-retries";
    private static final String X_DELAY_HEADER = "x-delay";
    private static final int NUMBER_OF_RETRIES = 3;
    private static final int DELAY_MS = 300000;
    private RabbitTemplate rabbitTemplate;

    @Autowired
    public HandlerDlq(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @RabbitListener(queues = MessageInputProcessor.DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer  retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = 0;
        }
        if (retriesHeader > NUMBER_OF_RETRIES) {
            LOGGER.warn("Message {} added to failed messages queue", failedMessage);
            this.rabbitTemplate.send(MessageInputProcessor.FAILED, failedMessage);
            throw new ImmediateAcknowledgeAmqpException("Message failed after " + NUMBER_OF_RETRIES + " attempts");
        }
        retriesHeader++;
        headers.put(X_RETRIES_HEADER, retriesHeader);
        headers.put(X_DELAY_HEADER, DELAY_MS * retriesHeader);
        LOGGER.warn("Retrying message, {} attempts", retriesHeader);
        this.rabbitTemplate.send(MessageInputProcessor.DELAY_EXCHANGE, MessageInputProcessor.INPUT_DESTINATION, failedMessage);
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(MessageInputProcessor.DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(MessageInputProcessor.INPUT_DESTINATION)).to(delayExchange()).with(MessageInputProcessor.INPUT_DESTINATION);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(MessageInputProcessor.FAILED);
    }
}

我的MessageInputProcessor接口(interface):

public interface MessageInputProcessor {

    String INPUT = "myInput";

    String INPUT_DESTINATION = "myInput.group";

    String DLQ = INPUT_DESTINATION + ".dlq"; //from application.properties file

    String FAILED = INPUT + "-failed";

    String DELAY_EXCHANGE = INPUT_DESTINATION + "-DlqReRouter";

    @Input
    SubscribableChannel storageManagerInput();

    @Input(MessageInputProcessor.FAILED)
    SubscribableChannel storageManagerFailed();
}

还有我的属性文件:

#dlx/dlq setup - retry dead letter 5 minutes later (300000ms later)
spring.cloud.stream.rabbit.bindings.myInput.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.republish-to-dlq=true
spring.cloud.stream.rabbit.bindings.myInput.consumer.dlq-ttl=3000
spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true


#input
spring.cloud.stream.bindings.myInput.destination=myInput
spring.cloud.stream.bindings.myInput.group=group

使用此代码,我可以从死信队列中读取数据,捕获 header ,但无法将其放回队列(行 LOGGER.warn("Retrying message, {} attempts", retriesHeader) ; 只运行一次,即使我把时间放得很慢)。

我的猜测是,bindOriginalToDelay 方法将交换绑定(bind)到一个新队列,而不是我的。但是,我没有找到一种方法将我的队列绑定(bind)到那里,而不是创建一个新队列。但我什至不确定这是错误。

我还尝试发送到 MessageInputProcessor.INPUT 而不是 MessageInputProcessor.INPUT_DESTINATION,但它没有按预期工作。

此外,不幸的是,由于对项目的依赖,我无法更新 Spring 框架...

您能帮我在一段时间后将失败的消息放回到我的队列中吗?我真的不想在那里放一个 thread.sleep...

最佳答案

通过该配置,myInput.group 绑定(bind)到具有路由键 # 的延迟(主题)交换 myInput

您可能应该删除 spring.cloud.stream.rabbit.bindings.myInput.consumer.delayedExchange=true 因为您不需要延迟主交换。

它还将通过键 myInput.group 绑定(bind)到您的显式延迟交换。

在我看来一切都是正确的;您应该看到绑定(bind)到两个交换器的相同(单个)队列:

enter image description here

myInput.group.dlq 通过键 myInput.group 绑定(bind)到 DLX

您应该设置更长的 TTL 并检查 DLQ 中的消息以查看是否有突出的内容。

编辑

我刚刚复制了您的代码,延迟了 5 秒,它对我来说效果很好(关闭了主交换上的延迟)。

Retrying message, 4 attempts

added to failed messages queue

也许您认为它不起作用是因为主交换也有延迟?

关于java - 延迟处理死信队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59363167/

相关文章:

java - 如何在 Spring Boot 中为每个用户设置速率限制?

java - 对 Spring Boot 的 Angular HTTP POST 请求导致空值

python - 在单线程 python 应用程序中一起使用 http 和 mqtt

java - Jersey :找不到媒体类型=多部分/表单数据的 MessageBodyReader

java - A* (A Star) 算法输出所有可能的解决方案

java - 如何在hibernate中查询map元素?

java - 如何在 Spring MVC 中使用带有延迟结果的超时?

queue - 删除 RabbitMQ 中的队列

ssl - RabbitMQ 集群算子 - 启用 MQTT 插件

java - 图形用户界面 : Highlight the Path It Takes for A Maze?