rabbitmq - AMQP basic.get 从队列中拉取并发消费者

标签 rabbitmq message-queue amqp

当使用 RabbitMQ 作为消息代理时,我有一个场景,多个并发消费者使用 basic.get AMQP 方法从队列中提取消息,并使用显式确认从队列中删除消息。假设以下设置

Q 有消息 M1、M2、M3 并且有消费者 C1、C2 和 C3(每个都有自己的连接和 channel )连接到它。

  • basic.get 方法中如何处理并发?对 basic.get 方法的调用是否同步以处理每个使用自己的连接和 channel 的并发使用者? C1、C2 和 C3 发出 basic.get 调用以同时接收消息(假设服务器同时接收所有 3 个请求)。
  • C1 使用 basic.get 请求消息并获取 M1。当 C2 请求消息时,由于它使用不同的连接,它是否再次获得 M1?
  • 消费者如何按预定义大小批量拉取消息?
  • 最佳答案

    您的问题确实触及了排队和流程理论的核心,所以我将从这个角度回答(就我的回答而言,RabbitMQ 确实是一个通用的消息代理,因为这适用于任何消息代理)。

    How is concurrency handled in the basic.get method? Is the call to basic.get method synchronized to handle concurrent consumers each using its own connection and channel? C1, C2 and C3 issue a basic.get call to receive a message at the same time (assume the server receives all 3 requests simultaneously).



    答案 1 : RabbitMQ 旨在成为一个可靠的消息代理。它包含内部流程和控制,以确保相同的消息不会多次传递给不同的消费者。现在,由于测试您描述的场景不切实际,它是否完美运行?谁知道。这就是为什么使用基于消息架构的正确设计的应用程序将使用幂等事务,这样如果多次处理同一个事务,结果将与事务处理一次相同。
    外卖 :设计您的应用程序,使这个问题的答案不重要。

    C1 requests a message using basic.get and gets M1. When C2 requests for a message, since its using a different connection, does it get M1 again?



    答案 2 : 不。根据我之前回答的假设,RabbitMQ 代理一旦传递了相同的消息,就不会返回相同的消息。根据 channel 和队列的设置,消息可能会在传递时自动确认并且永远不会重新传递。其他设置将使消息在处理线程/ channel “死亡”或来自处理线程的否定确认时自动重新排队。这是一项重要的功能,因为如果“毒药”消息可以提供给多个消费者,它可能会反复对您的应用程序造成严重破坏。 外卖 :在设计应用程序时,您可以放心地依赖此假设。

    How can consumers pull messages in batches of a predefined size?



    答案 : 他们不能,对他们来说也没有意义。在任何排队系统中,基本假设是从单个文件中的队列中删除项目。试图违反这一假设会导致不可预测的行为;此外,单件流通常是最有效的处理方法。但是,在现实世界中,有时需要批量大小 > 1。在这种情况下,将批处理加载到它自己的单个消息中是有意义的,因此这可能需要一个单独的处理线程来从队列中提取消息并将它们批处理在一起,或者最初将它们分批处理。请记住,一旦您有多个消费者,就无法保证按顺序处理单个消息。 外卖 :应尽可能避免批处理,但在无法避免的情况下,您不能假设批处理将包含任何特定顺序的单个消息。

    关于rabbitmq - AMQP basic.get 从队列中拉取并发消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27067833/

    相关文章:

    使用 AMQP (RabbitMQ) 作为 Apache Thrift RPC 传输层的 Python 库

    multithreading - 对具有极高消息速率的应用程序进行多线程处理(我应该使用哪种方法?)

    rabbitmq - RabbitMQ/AMQP 中的消息版本控制?

    jms - AMQP & Openwire - Activemq 经纪人和 2 个不同的消费者

    c# - 如何使用公共(public)交通检测与rabbitmq的连接问题?

    docker - 如何从 docker run 命令向主管传递环境变量

    python - 为什么我在使用 Erlang 19.1.1 的 .Net Client 到 RabbitMQ 时出现 SSL 握手失败,但在 17.4、18.1 和 18.2 中却没有?

    logging - 集中式网络日志记录 - 系统日志和替代方案?

    java - 如何使一个流程文件成为单独的预先存在的流程文件的子文件?

    python - 使用 Tornado 和 Pika 进行异步队列监控