RabbitMQ/AMQP : multiple queues, 单消费者

标签 rabbitmq amqp node-amqp

我想创建一个消费者来处理来自多个可变数量的源的消息,这些源是动态连接或断开连接的。

我需要的是每个消费者优先处理每个源的前 N ​​条消息 .然后运行多个消费者以提高速度。

我一直在阅读 Work queues 的文档, RoutingTopics ,以及许多其他文档,但没有确定如何实现这一点。我也做了一些没有运气的测试。

有人可以指出我该怎么做或在哪里阅读它吗?

- 编辑 -

队列A-----A3--A2--A1-┐

队列B-----B3--B2--B1-┼------ 消费者

队列C-----C3--C2--C1-┘

预期的效果是每个消费者都获得每个队列的第一条消息。例如:A1、B1、C1、A2、B2、C2、A3、B3、C3等。如果创建了一个新队列 (QueueD),消费者将开始以相同的方式接收来自它的消息。

提前致谢

最佳答案

What I need is that each consumer prioritize first N messages of each source. Then to run multiple consumers to improve the speed.



我所知道的所有消息队列都只在队列本身内提供排序保证(Kafka 不是在队列级别而是在队列内的分区内提供排序保证)。但是,在这里您要求序列化多个队列。这在分布式系统环境中是不可能的。

为什么?因为如果这些队列有多个消费者,消息将以循环方式传递给队列的每个连接的消费者。

假设一个 prefetch_count=1并且有两个连接的消费者,说第一组消息传递如下:
  • A1、B1 和 C1 交付给消费者 1 (X)
  • A2、B2 和 C2 交付给消费者 2 (Y)

  • 现在,在分布式系统中,一切都是异步的,事情可能会出错。例如:

    如果 X 确认 A1,A3 将被传递给 X。但是如果 Y 在 X 之前确认 A2,A3 将被传递给 Y。

    在分布式系统中,谁先确认不在您的控制范围内。考虑以下场景:
  • X 可能不得不等待 I/O 或 CPU 密集型任务,而 Y 可能很幸运,因为它不必等待。然后 Y 将遍历队列中的消息。
  • 或者 Y 被杀死(一个分区)或 n/w 变慢,然后 X 将继续消耗队列。

  • 我强烈建议您重新考虑您的要求,并在异步上下文中考虑您的预期保证(否则您不会考虑 MoM,对吗?)。

    PS:可以使用一些消费者端逻辑来实现您的要求(性能/吞吐量会受到影响)。
  • 单个消费者必须连接到所有队列
  • 在确认消息之前等待来自每个队列的消息。
  • 收到来自每个队列的消息后,将它们分组为一条消息并发布到另一个队列 (P)。
  • 现在许多消费者可以订阅 P 来处理有序的消息组。

  • 我不建议这样做,但是嘿,这是你的系统,谁会阻止你;)

    关于RabbitMQ/AMQP : multiple queues, 单消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50140623/

    相关文章:

    javascript - Node.js 在脚本之间传输数据

    java - 编写无代理 AMQP 到 MQTT 适配器

    node.js - meteor + Node -amqp : unable to connect via SSL to RabbitMQ server

    python - 测量 Celery 任务执行时间

    django - celery :无法连接到rabbitmq

    docker - 在 docker 镜像中启用 rabbit mq 服务器的日志记录

    java - 将 AMQP 与 JTA 结合使用

    java - RabbitMQ Spring 模板在发送 15 条消息后抛出尝试使用关闭 channel

    node.js - 使用 node.js amqp 模块时如何将 AQMP 消息缓冲区转换为 JSON 对象?

    queue - 队列发送者如何知道消费者崩溃了?