java - 如何以编程方式将异步消费者订阅到 Spring AMQP 中的队列?

标签 java spring spring-amqp

Spring AMQP 文档仅展示如何在应用程序初始化时使用 ~MessageListenerContainer 以编程方式订阅队列。

代码现在如下所示:

public void subscribe(EventType eventType, Object consumer) {
        Assert.notNull(eventType);
        Assert.notNull(eventType.toString());
        Assert.isTrue(!eventType.toString().isEmpty());

        Queue queue=new Queue("", false, true, true);
        Map<String, Exchange> beanMap=context.getBeansOfType(Exchange.class);
        if(beanMap!=null&&!beanMap.isEmpty()){
            Exchange exchange=null;
            boolean found=false;
            for(String key : beanMap.keySet()) {
                exchange=beanMap.get(key);
                if(getExchangeName(eventType.toString()).equals(exchange.getName())){
                    found=true;
                    break;
                }
            }
            if(found){
                amqpAdmin.declareQueue(queue);
                amqpAdmin.declareBinding(new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), "", null));
                SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
                container.setConnectionFactory(connectionFactory);
                container.setQueueNames(queue.getName());
                container.setMessageListener(new MessageListenerAdapter(consumer));
            }else{
                //TODO
            }
        }
    }

由于 SimpleMessageListenerContainer 是一个生命周期 bean,因此除非在 @Configuration bean 中使用,否则它不会工作

我需要在运行时以编程方式创建订阅,以便 Bean 从队列中获取事件。知道如何做到这一点吗?

最佳答案

不清楚你的问题是什么。您可以在运行时创建容器;只需在配置后调用 start() 即可。有许多测试用例正是这样做的。

您还可以将队列添加到现有容器(并删除它们);容器的消费者将被回收并从新的队列列表开始消费。

关于java - 如何以编程方式将异步消费者订阅到 Spring AMQP 中的队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27444901/

相关文章:

java - printf只在java中打印小数部分

java - 如何使用运行时加载的属性参数化 ehcache.xml?

java - Apache camel - 我使用 Spring XML 来定义我的 Camel 上下文。是否可以在运行时在 Java 中获取相同的 camelcontext 对象?

transactions - SimpleMessageListenerContainer ChannelTransacted 和 MaxConcurrentConsumers

java - 什么是 NullPointerException,我该如何解决?

java - 如何自动格式化java代码以添加大括号

java - 如何在 Primefaces 2.2.1 中为 JSF2 自定义终端组件?

java - 在 tomcat 中部署 Spring Boot war 时,application.yml 文件应该放在哪里?

java - RabbitMQ 推迟接收

java - 在 Spring AMQP 中根据消费者数量使用队列