java - spring-rabbit 中每个主题的并发消费者

标签 java spring rabbitmq

spring-rabbit 可以支持单个主题上的多个并发消费者吗?

详细信息如下

我的系统使用手动确认模式,通过 spring-rabbit (Spring 4.0.6) 进行主题交换。模式如下:

  • 消息进入 ChannelAwareMessageListener
  • 工厂方法生成适当的工作线程并传入对 channel 的引用
  • 如果工作线程成功处理消息,则消息被确认
  • 如果工作进程不成功或发生异常,消息将被 Nack 并发送到死信队列以供稍后处理

由于其中一些工作人员可能需要相当多的时间来完成其 IO 绑定(bind)处理,因此我需要能够设置更多数量的并发消费者。

然而,经过一些测试,我注意到有时多个消费者会收到相同的消息。果然,查看文档( http://docs.spring.io/spring-framework/docs/4.0.6.RELEASE/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setConcurrentConsumers-int- )证实了我的发现:

Do not raise the number of concurrent consumers for a topic, unless vendor-specific setup measures clearly allow for it. With regular setup, this would lead to concurrent consumption of the same message, which is hardly ever desirable.

我的问题如下:

  1. “除非供应商特定的设置措施明确允许”到底是什么意思?是否有补丁/版本/配置或 Rabbit 支持此功能?
  2. 我可以轻松地在客户端上编写代码,以防止消息已被其他工作人员处理时被处理。那么,我该如何处理此消息呢?发送nack?忽略它?如果我 nack,然后实际处理消息的工作人员在一段时间后发送 ack 会发生什么?会抛出异常吗?

提前致谢...

最佳答案

您提到的警告是关于 JMS 而不是 RabbitMQ。看看Spring RabbitMQ documentation 。该文档不包含此警告。

一旦消息被传递到队列(无论交换类型如何),消费者/工作人员一次只能一次获取该消息(假设没有问题)。

如果您两次收到相同的消息,则说明存在问题:

  • 消息被确认并重新排队
  • 客户端关闭 channel /连接
  • 存在网络问题,Rabbit 自动重新排队消息(服务器和客户端上的 channel /连接已关闭)

对于最后两点,您应该会收到一些错误消息。

请注意,我认为这一点是不必要的,并且可以解释问题:

  • A factory method generates an appropriate worker and passes in a reference to the channel

SimpleMessageListenerContainer 已使用Executor。当您使用自己的执行器时,spring-amqp channel 池(如果您使用任何)和您的执行器之间可能存在问题,例如。该 channel 已关闭,因为 spring-amqp 认为它不再使用。

不要生成自己的线程,而是在当前 ChannelAwareMessageListener#onMessage 线程的同一线程上处理消息。

关于java - spring-rabbit 中每个主题的并发消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29481992/

相关文章:

centos - 无法从 src 构建 rpm

java - 使用 Android 从图像中读取文本

java - 到底如何使用 java.swing.AbstractListModel 中的 "fireContentsChanged"方法?

java - Java中的方法不能直接返回值,为什么?

java - SpringBoot从2.1.1升级到2.1.2报错

java - 使用 Spring Integration 和 RabbitMQ 时如何在消费者端处理格式错误的消息

java - 在库项目内的 Activity 中使用 Android 通用图像加载器

java - Jetty URL 重写/重定向去除查询字符串

java - 如何在同一行上使用 map 和 orElseThrow?

python - 为什么 celery.control.inspect 报告的排队任务比 rabbitmqctl 少?