java - 多个持久的听众。他们同时工作吗?

标签 java spring-boot activemq publish-subscribe spring-jms

我的配置:

@Bean
    public ActiveMQConnectionFactory connectionFactory(){
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
        return connectionFactory;
    }
 @Bean
    public DefaultMessageListenerContainer listenerContainers() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        //container.setConnectionFactory(connectionFactory1());
        container.setClientId("consumer1");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    }

    @Bean
    public DefaultMessageListenerContainer listenerContainers1() {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(connectionFactory());
        container.setClientId("consumer2");
        container.setDestinationName(COMMENT_QUEUE);
        container.setPubSubDomain(true);
        container.setSessionTransacted(true);
        container.setSubscriptionDurable(true);
        container.setMessageListener(datafileSubscriber);
        container.start();
        return container;
    } 

我需要将消息发布给多个监听器。所有监听器都执行相同的代码。我希望它们耐用。我已将 setsessiontransacted 设置为 true。这是一个发布/订阅模型。

我的想法是,如果一个监听器将执行代码。其他听众只需发送确认即可。这样他们就可以收到另一条消息。

我的假设是: 代理向两个监听器发送消息。其中一个立即确认,而另一个则处理它。
现在经纪人又收到了一条消息。由于第一个监听器没有发送确认,它会将消息发送给第二个监听器 它将消息放入第一个监听器的队列中,以便每当第一个监听器确认前一条消息时它就可以发送。

我的重要疑问: activemq 代理是否在没有所有监听器确认的情况下发送另一条消息?

我认为这个概念是每个监听器都会在代理中维护一个队列。当代理收到消息时,它将消息推送到 每个单独监听器的队列。如果监听器空闲,它将获取消息。如果正忙处理,直到发送确认为止,该消息 将留在队列中。确认后,下一条消息将传递给收听者。

我只是在我拥有的属性、持久订阅者、setsession 交易真实的情况下才这么说。

我尝试过但失败的事情。 我尝试将并发消费者属性设置为 2,并将其设置为持久订阅者。看起来如果它是一个持久订阅者,它需要一个 唯一的客户端 ID。因此,我转而使用具有并发消费者属性 1 的多个容器。

编辑: 我在这里所说的一切都是在我的配置上下文中,该配置使用持久订阅者、setsessiontransacted true 和相同的消息监听器

最佳答案

My assumption here: The broker sends a message to both the listeners. One of them acknowledges immediately, while the other processes it. Now the broker got another message. Since the 1st listener didnt send an acknowledgement, it will send the message to the 2nd listener and it puts the message in a queue for 1st listener so that it can send whenever the 1st listener acknowledges the previous message.

它根本不是这样工作的,消费者/订阅是相互独立的。每个用户都没有“队列”;只是主题;通过持久订阅,代理可以跟踪发送给消费者的最后一条消息;当所有持久订阅都收到该消息后,该消息将被删除。

向消费者发送消息的实际过程取决于其他因素,例如,ActiveMQ 支持预取(默认 1000),这意味着它将发送最多该数量的消息,而无需等待 ack。

您必须将 sessionTransacted 与 DMLC 一起使用,以便在监听器完成之前不会提交 ack。

concurrent consumer property to 2

正如我在回答您的其他问题时所说,从主题消费时增加并发性是没有意义的。

关于java - 多个持久的听众。他们同时工作吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46549249/

相关文章:

java - 如何向 Android 的 native 'Edit text' 上下文菜单添加选项

java - 检查参数时舍入

java - Chrome 在与 Java SSL 服务器握手时关闭连接

gradle bootRun > 使用测试类路径

java - 有没有办法在流式传输列表时检查条件

java - Spring 启动: IllegalStateException:Duplicate fragment name spring_web

java - 如何在管道分隔文件中找到未被管道包围的字符串?

java - ActiveMQ 死信队列未创建

activemq - 如何在 MassTransit 中为 ActiveMQ 设置唯一的队列名称?

ubuntu - 如何启用在 Ubuntu 上运行的 Apache ActiveMQ 实例?