multithreading - RabbitMQ - 每个路由键单个并发 worker

标签 multithreading rabbitmq message-queue rabbitmq-exchange

RabbitMQ 很新,我想看看我是否能用它实现我所需要的。

我正在寻找 Worker Queues 模式,但有一个警告。我希望每个路由键只有一个工作人员同时运行。

一个澄清的例子:

如果我按顺序发送以下带有路由键的消息:a , a , b , c ,我希望只有 3 个 worker 同时运行。当第一个a收到消息后,工作人员将其拾取并处理。

当下一个a收到消息并且之前的a消息仍在处理(未确认)新的 a消息应该在队列中等待。当bc收到消息,他们每个人都有一个工作人员处理它们。当第一个a消息被确认任何 worker 都可以接下一个a信息。

这种模式是否可以以自然的方式使用 RabbitMQ(无需在我这边编写任何应用程序代码来处理锁定和其他东西......)

编辑:

另一个澄清。所有工作人员都可以并且应该处理所有消息,并且我不希望每个工作人员都有一个队列,因为我想在他们之间分担负载,并且发布者不知道哪个工作人员应该处理消息。但我确实想确保没有 2 名工作人员同时处理共享相同 key 的消息。

例如,如果我有一个发布者使用 userId 发布消息。字段,我想确保没有 2 个 worker 正在处理具有相同 userId 的消息同时。

编辑 2

扩展 userId例子。假设我有一个发布者和 3 个 worker 。发布者发布如下消息:{ userId: 1, text: 'Hello' } , 有不同的 userId s。我的 3 个工作人员都对这些消息做同样的事情,所以我可以让他们中的任何一个处理进来的消息。但我想要实现的是只有一个工作人员同时处理来自某个用户的消息时间。如果 Worker 收到一 strip 有 userId 的消息1 并且仍在处理它,另一 strip 有 userId 的消息收到 1 我想确保没有其他 Worker 收到该消息。但其他消息以不同的userId s 应该由其他可用的 Worker 处理。
userId s 事先不知道,发布者不知道有多少工作人员或关于他们的任何具体信息,他只想安排消息进行处理。

最佳答案

路由键无法满足您的要求,而是通过一些设置内置于队列中。

如果您为 a 定义“queue_a”消息,“queue_b”,用于 b消息等,然后您可以让任意数量的消费者连接到它。

RabbitMQ 只会将给定消息传递给给定队列的单个消费者。

它与单个队列上的多个消费者一起工作的方式是消息的基本循环式调度。也就是说,第一条消息将被传递给一个消费者,而下一条消息(假设第一个消费者仍然很忙)将被传递给下一个消费者。

因此,这应该满足将消息传递给队列的任何给定使用者的需求。

为了确保您的消息有平等的机会到达任何消费者(并且并非一直都传递给同一个消费者),您应该进行一些其他设置。

首先,确保设置消息消费者no ack设置为 false (有时称为“自动确认”)。这将迫使您 ack来自您的代码的消息。

最后,将消费者的“消费者预取”限制设置为 1。

使用这种设置组合,单个消费者将检索单个消息并开始处理它。当该消费者工作时,队列中等待的任何消息都将被传递给其他消费者(如果有的话)。如果没有可用的消息,则消息将在队列中等待,直到有可用的消费者。

有了这个,您应该能够在给定的队列上实现您想要的行为。

...

请记住,这仅适用于队列。不能以这种方式管理路由 key 。来自交换的所有匹配的路由键都会导致将消息的副本发送到目标队列。

关于multithreading - RabbitMQ - 每个路由键单个并发 worker ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38940411/

相关文章:

c++ - 同步工作线程

linux - 如何在 ELK 的 rabbitmq 中使用 guest 以外的用户?

apache-kafka - Kafka如何处理仅由一个消费者发出的消息?

Azure Durable Functions 作为消息队列

c++ - 杀死正在执行 memcpy 的线程是否安全?

c++ - 如何从开始在对象的成员函数中执行的 c++ boost thread_group 创建新线程?

java - Thread VS RabbitMQ Worker资源消耗

go - 如何按名称将任务发布到 Go 中的 RabbitMQ 队列?

java - RabbitMQ 消息丢失

c# - WCF 服务在单个线程中处理传入请求,尽管有 ConcurrencyMode.Multiple