java - 当使用RabbitMQ作为java工作队列时,应该如何处理并发和 transient 错误?

标签 java multithreading rabbitmq amqp

我正在考虑设置一个 RabbitMQ 代理来处理 Java Web 应用程序的基本任务处理。基本思想是生产者是一个 Web 服务器,它希望快速处理请求,但也需要进行一些数据处理。为了实现这一点,它知道一些可以序列化为 AMQP 消息的作业 DTO,并且某个地方有一个知道如何处理这些作业的使用者。经典。

阅读 RabbitMQ 文档后,我仍然有几个问题。

  1. 我希望消费者应用程序充分利用其 CPU 来处理消息,并且我想知道 RabbitMQ 是否已经提供了工作池。假设我想要 4 个工作线程,在同一个连接上注册 4 个 channel (每个 channel 有 1 个消费者)是否足够?在这种情况下,工作将在handleDelivery() 中完成,如果成功完成,将发送一个ack。或者更确切地说,我应该使用 1 个使用者并在我自己的应用程序层管理一组工作人员吗?

  2. 消费者将与数据库对话,这意味着会发生暂时性错误;死锁、乐观锁冲突、数据库服务器重新启动等。如果消费者无法处理作业,预期的行为是什么?发出 nack 并等待代理重新发送作业,或者延迟 ack 直到确定作业无法完成?还是没关系?

值得注意的是,除非数据库正在重新启动或其他原因,否则作业通常不会花费超过几秒钟的时间。我感谢任何指导!

最佳答案

1a。如果您想要固定数量的工作线程,那么您应该打开多个 channel 并为每个 channel 注册一个消费者。 RabbitMQ 每个 channel 都有一个调度程序线程,因此一个消费者将阻止在同一 channel 上注册的其他消费者。

1b。如果您需要动态增长的工作人员池,那么您可以考虑使用 spring-rabbit (假设您用 Java 实现)。具体来说,SimpleMessageListenerContainer 管理根据工作负载动态增长和收缩的消费者池。请参阅"Listeners Concurrency"部分在 Spring AMQP 引用指南中。

2a。如果您知道工作线程失败了,您可以进行 nack,消息将被传递给另一个消费者。除非是“自动确认”模式,否则在消费者确认消息之前,该消息不会传递给另一个消费者。

关于java - 当使用RabbitMQ作为java工作队列时,应该如何处理并发和 transient 错误?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22901822/

相关文章:

java - 运行我的应用程序时出现错误 "context initialization failed"

c# - 多个线程向一个列表添加元素。为什么列表中的项目总是比预期的少?

C++/MFC/ATL 线程安全字符串读/写

rabbitmq - AMQP basic.get 从队列中拉取并发消费者

java - 卡夫卡 : Publish message only if it doesn't already exist

java - 如何在java中将double转换为bigInteger

java - 如何在android中的我的自定义相机上添加自定义颜色效果

使用集合时线程安全的 C# 最佳实践(还不是并发的)

rabbitmq - 使用 rabbitmq 作为 celery 后端的正确配置

java - 有没有办法在监听器中获取路由 key