java - 如何收听动态创建的队列?

标签 java rabbitmq amqp spring-amqp spring-rabbit

我有一个 rabbitListener,它持续异步地监听队列“user-messages”的用户消息。一切正常,除非队列中加载了批量消息。当批量消息发布到队列时,同一用户的消息首先被处理,因此其他用户的消息正在等待轮到他们。

我不能使用Priority Queue,因为所有用户都具有相同的优先级。所以我想创建新队列并在运行时 收听它们。一旦消息被消耗,所有队列都将是短暂的。 (队列将被删除)

在浏览时,我发现可以使用 RabbitAdmin 动态创建队列。但问题是

  1. 如何让我的监听器监听在运行时创建的新的短期 (TTL) 队列?
  2. 如何让监听器停止监听已删除的队列(在 TTL 时间之后)以避免异常?

目前,我正在使用 SimpleMessageListenerContainerFactory。我也没有问题可以使用 DirectMessageListenerContainer。我唯一关心的是如何向监听器传达有关动态队列创建和删除的信息。正在考虑https://www.rabbitmq.com/event-exchange.html (事件交换插件)。

spring-amqp 是否支持启动/停止 监听动态队列。提前致谢。

    @Bean
    public SimpleRabbitListenerContainerFactory myRabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(config.getConnectionFactory());
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(3);
        return factory;
    }

    @RabbitListener(id = "listener", queues = {
            "#{receiver.queues()}" }, containerFactory = "myRabbitListenerContainerFactory")
    public void listen(QueueMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            MessageHeaders headers) {
         //process message
    }


  [1]: https://www.rabbitmq.com/event-exchange.html

最佳答案

这家伙似乎就是这么做的 => https://karadenizfaruk28.medium.com/rabbitmq-dynamic-queue-add-and-listen-at-runtime-with-springboot-c7d42f0447c

链接中的代码:

  • rabbitMQ 配置
@Configuration
public class RabbitMqConfiguration implements RabbitListenerConfigurer {
    @Autowired
    private ConnectionFactory connectionFactory;
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
    @Bean
    public RabbitTemplate rabbitTemplate() {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }
    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(connectionFactory);
    }
    @Bean
    public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
        return new RabbitListenerEndpointRegistry();
    }
    @Bean
    public DefaultMessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(consumerJackson2MessageConverter());
        return factory;
    }
    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    @Override
    public void configureRabbitListeners(final RabbitListenerEndpointRegistrar registrar) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setPrefetchCount(1);
        factory.setConsecutiveActiveTrigger(1);
        factory.setConsecutiveIdleTrigger(1);
        factory.setConnectionFactory(connectionFactory);
        registrar.setContainerFactory(factory);
        registrar.setEndpointRegistry(rabbitListenerEndpointRegistry());
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }
}
  • 接口(interface)
public interface RabbitQueueService {
    void addNewQueue(String queueName,String exchangeName,String routingKey);
    void addQueueToListener(String listenerId,String queueName);
    void removeQueueFromListener(String listenerId,String queueName);
    Boolean checkQueueExistOnListener(String listenerId,String queueName);
}
  • 服务
@Service
@Log4j2
public class RabbitQueueServiceImpl implements RabbitQueueService {
    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    private RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry;

    @Override
    public void addNewQueue(String queueName, String exchangeName, String routingKey) {
        Queue queue = new Queue(queueName, true, false, false);
        Binding binding = new Binding(
                queueName,
                Binding.DestinationType.QUEUE,
                exchangeName,
                routingKey,
                null
        );
        rabbitAdmin.declareQueue(queue);
        rabbitAdmin.declareBinding(binding);
        this.addQueueToListener(exchangeName,queueName);
    }

    @Override
    public void addQueueToListener(String listenerId, String queueName) {
        log.info("adding queue : " + queueName + " to listener with id : " + listenerId);
        if (!checkQueueExistOnListener(listenerId,queueName)) {
            this.getMessageListenerContainerById(listenerId).addQueueNames(queueName);
            log.info("queue ");
        } else {
            log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
        }
    }

    @Override
    public void removeQueueFromListener(String listenerId, String queueName) {
        log.info("removing queue : " + queueName + " from listener : " + listenerId);
        if (checkQueueExistOnListener(listenerId,queueName)) {
            this.getMessageListenerContainerById(listenerId).removeQueueNames(queueName);
            log.info("deleting queue from rabbit management");
            this.rabbitAdmin.deleteQueue(queueName);
        } else {
            log.info("given queue name : " + queueName + " not exist on given listener id : " + listenerId);
        }
    }

    @Override
    public Boolean checkQueueExistOnListener(String listenerId, String queueName) {
        try {
            log.info("checking queueName : " + queueName + " exist on listener id : " + listenerId);
            log.info("getting queueNames");
            String[] queueNames = this.getMessageListenerContainerById(listenerId).getQueueNames();
            log.info("queueNames : " + new Gson().toJson(queueNames));
            if (queueNames != null) {
                log.info("checking " + queueName + " exist on active queues");
                for (String name : queueNames) {
                    log.info("name : " + name + " with checking name : " + queueName);
                    if (name.equals(queueName)) {
                        log.info("queue name exist on listener, returning true");
                        return Boolean.TRUE;
                    }
                }
                return Boolean.FALSE;
            } else {
                log.info("there is no queue exist on listener");
                return Boolean.FALSE;
            }
        } catch (Exception e) {
            log.error("Error on checking queue exist on listener");
            log.error("error message : " + ExceptionUtils.getMessage(e));
            log.error("trace : " + ExceptionUtils.getStackTrace(e));
            return Boolean.FALSE;
        }
    }

    private AbstractMessageListenerContainer getMessageListenerContainerById(String listenerId) {
        log.info("getting message listener container by id : " + listenerId);
        return ((AbstractMessageListenerContainer) this.rabbitListenerEndpointRegistry
                .getListenerContainer(listenerId)
        );
    }
}

关于java - 如何收听动态创建的队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59639632/

相关文章:

java - 如何调整 wicket 应用程序的性能

java - 错误 : incompatible types: char cannot be converted to String

python - rabbitmq内存控制,队列已满,未分页。连接挂起

java - 当消息因队列大小而被拒绝时,RabbitMQ 不会导致发送失败

python - 使用 Python、Pika 和 AMQP 设计异步 RPC 应用程序的最佳模式是什么?

python-2.7 - 需要 qpid-proton 发布/订阅 amqp 示例程序,以便 python 访问 Azure 主题

java - 基于 AMQP 的 ServiceBus 丢失重新传递的消息

java - Xa资源恢复管理器 : Error while retrieving xids from resource

java - 如何让 JAXB 在解码期间将我的子类之一用于非根元素?

django - 由于errno 104, celery 操作不佳