java - Spring RabbitListener 在发送消息后调用一个方法

标签 java spring rabbitmq spring-amqp spring-rabbit

在我的接收器处理了特定数量的消息后,我需要停止 Rabbit 监听器并调用一个方法,然后再次启用监听器。

@RabbitListener(...)
public void sink(Message msg) {
    processMsg();

    if (condition) {
        rabbitListenerEndpointRegistry.stop();
        doTask();
        rabbitListenerEndpointRegistry.start();
    }
}

不幸的是,如果我停止接收器方法内的监听器,事务将失败并且消息将返回到队列。我正在寻找一种在事务完成且监听器不持有任何消息后调用方法的方法。

  1. Receive a msg and process the msg
  2. Finish the transaction and release the msg <- I don't want to receive any new msgs after this point
  3. If the condition is satisfied stop the listener
  4. Do a long running task
  5. Start the listener

我无法转向手动事务管理,因为它需要对代码进行太多更改,并且在执行自定义任务时我无法保留任何消息,因为它是一个长时间运行的任务,并且我希望其他工作人员处理这段时间的消息。

Rabbit 工厂配置中的

setAfterReceivePostProcessorssetAdviceChain 在我的情况下不起作用,因为在这两种情况下,当监听器持有消息时都会调用该方法。

最佳答案

消息监听器容器似乎不适合您的应用程序;您需要一个“拉动”模型。

您可以使用 rabbitTemplate.receive()receiveAndConvert() 方法来按需拉取消息。

如果您在事务之外使用它们,消息将立即得到确认;如果您希望在流程完成后确认消息,请在事务中运行它。

关于java - Spring RabbitListener 在发送消息后调用一个方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58485434/

相关文章:

java - 为什么在我登录之前创建了一个新 session ?

java - Spring Security 5 根据 JWT 声明填充权限

rabbitmq - Celery(Django)限速

java - Seam 框架有多受欢迎

java - 如何获得写在一行中的两个数字 - 是否有 Split() 方法?

java - 如何在eclipse中导入java servlet项目?

java - Http 标签的 Spring 安全问题

java - 如何在 Java Spring MVC 创建时将大量数据作为文件提供?

websocket - 没有 SockJS 的 RabbitMQ Web STOMP

python - Celery 将任务路由到错误的工作服务器