spring - 使用Spring AMQP Java配置为每个队列配置专用的监听器容器

标签 spring rabbitmq spring-amqp

我有这样的XML配置的侦听器

<rabbit:listener-container connection-factory="connectionFactory" concurrency="1" acknowledge="manual">
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s1}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s2}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s3}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s4}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s5}" exclusive="true"/>
    <rabbit:listener ref="messageListener" queue-names="${address.queue.s6}" exclusive="true"/>
</rabbit:listener-container>


我正在尝试将其移至Java配置,但没有看到一种将多个MessageListener添加到ListenerContainer的方法。在我的情况下,创建多个ListenerContainer bean不是一个选择,因为我不知道从运行时开始消耗的队列数量。队列名称将来自配置文件。

我做了以下

@PostConstruct
public void init() 
{
    for (String queue : queues.split(","))
    {
        // The Consumers would not connect if I don't call the 'start()' method.
        messageListenerContainer(queue).start();
    }
}

@Bean
public SimpleMessageListenerContainer messageListenerContainer(String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}


它“分类”了一些工作,但我看到了一些奇怪的行为,其中打开了许多幻影通道,而这在XML配置中是从未发生过的。因此我怀疑自己做错了什么。我想知道在Java配置中创建MessageListenerContainers的正确方法吗?简而言之,“ Spring如何将具有多个'rabbit:listener'的'rabbit:listener-container'正确转换为Java对象?”任何帮助/见解对此将不胜感激。

商业案例
我们有一个发布者,用于发布用户配置文件更新。发布者可以分派多个更新以供相同用途,并且我们必须以正确的顺序处理它们,以维护数据存储中的数据完整性。

示例:用户:ABC,发布-> {UsrA:Change1,....,UsrA:Change 2,....,UsrA:Change 3}->使用者必须处理{UsrA:Change1,...., UsrA:更改2,....,UsrA:更改3}。

在我们之前的设置中,我们有1个队列来获取所有用户更新,并且我们有一个并发= 5的使用者应用程序。有多个应用程序服务器在运行使用者应用程序。这导致* 5 *“消费者应用程序的实例数”通道/线程*可以处理传入消息。速度很棒!但是我们经常发生乱序处理,导致数据损坏。

为了维持严格的FIFO顺序并仍尽可能并行地处理消息,我们实施了队列分片。我们有一个“ x-consistent-hash”,在employee-id上有一个哈希头。我们的发布者将消息发布到哈希交换,并且我们有多个分片队列绑定到哈希交换。这个想法是,我们将对a进行所有更改给定用户(例如用户A)在同一分片中排队,然后让我们的使用者以“独占”模式和“ ConcurrentConsumers = 1”连接到分片队列并处理消息。正确的顺序,同时仍并行处理消息。我们可以通过增加分片数量来使其更并行。

现在进入消费者配置

我们将消费者应用程序部署在多个应用程序服务器上。

原始方法:

如上所示,我只是在我的消费者应用程序的“ rabbit:listener-container”中添加了多个“ rabbit:listener”,除了服务器首先启动对所有分片队列的排他锁以及其他服务器的排他锁外,它的工作原理都很好服务器只是坐在那里不工作。

新的方法:

我们将分片队列名称移至应用程序配置文件。像这样

Consumer Instance 1 : Properties
queues=user.queue.s1,user.queue.s2,user.queue.s3

Consumer Instance 2 : Properties
queues=user.queue.s4,user.queue.s5,user.queue.s6


同样值得注意的是,我们可以有任意数量的Consumer实例,并且碎片可能会根据资源可用性在实例之间不均匀地分布。

将队列名称移到配置文件后,XML配置将不再起作用,因为我们无法像以前那样将“ rabbit:listener”动态添加到我的“ rabbit:listener-container”中。

然后,我们决定切换到Java配置。那就是我们被困的地方!

我们最初是这样做的

@Bean
    public SimpleMessageListenerContainer messageListenerContainer()
    {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
        container.setQueueNames(queues.split(","));
        container.setMessageListener(messageListener());
        container.setMissingQueuesFatal(false);

        // Set Exclusive Consumer 'ON'
        container.setExclusive(true);

        // Should be restricted to '1' to maintain data consistency.
        container.setConcurrentConsumers(1);

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.start();

        return container;
    }


它可以正常工作,但我们所有的队列都在一个共享1个通道的连接上。这不利于速度。我们想要的是一个连接,每个队列都有自己的通道。

下一步

这里还没有成功!我最初的问题中的java配置是我们现在的位置。

我感到困惑,为什么这样做是如此困难。显然,XML配置所做的事情在Java配置中并不是一件容易的事(或者至少对我来说是如此)。我认为这是一个需要填补的空白,除非我完全遗漏了一些东西。如果我错了,请纠正我。这是一个真正的业务案例,而不是一些虚构的案例。否则,请随时发表评论。

最佳答案

它可以正常工作,但我们所有的队列都在一个共享1个通道的连接上。这不利于速度。我们想要的是一个连接,每个队列都有自己的通道。


如果切换到DirectMessageListenerContainer,则该配置中的每个队列都会获得自己的Channel

See the documentation

要回答您的原始问题(预编辑):

@Bean
public SimpleMessageListenerContainer messageListenerContainer1(@Value("${address.queue.s1}") String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

...

@Bean
public SimpleMessageListenerContainer messageListenerContainer6(@Value("${address.queue.s6}" ) String queue)
{
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(consumerConnectionFactory);
    container.setQueueNames(queue);
    container.setMessageListener(messageListener());

    // Set Exclusive Consumer 'ON'
    container.setExclusive(true);

    // Should be restricted to '1' to maintain data consistency.
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return container;
}

关于spring - 使用Spring AMQP Java配置为每个队列配置专用的监听器容器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50938876/

相关文章:

java - 使用 mongo 响应式(Reactive) Spring 检查 Webflux 数据

java - 如何使 Spring testcontext 框架使用多个数据源?

java - Jetty 字符集 utf-8 与 字符集 UTF-8

rabbitmq - 为什么 MassTransit 只在一个线程上消费消息

redis - “排队”教程和文档?

spring-boot - Spring AMQP - 如何确认消息已成功传递和路由?

java - isAnonymous() 和 isAuthenticated() 都返回 false

rabbitmq - 将 sensu-client 连接到服务器时,AMQP 连接的 bad_header

java - 如何以编程方式将异步消费者订阅到 Spring AMQP 中的队列?

Spring AMQP - 是否有只等待确认的同步阻塞发送方法?