redis - 如何在任何 MQ 平台上实现这个单一并发分布式队列?

标签 redis queue message-queue amazon-sqs

我目前正在努力寻找实现特定类型队列的解决方案,这需要以下特征:

  1. 所有队列都必须遵循作业添加的顺序。
  2. 整个队列的并发度为 1,这意味着每个队列一次只会执行一个作业,而不是工作线程。
  3. 像这样的队列将会有数千人以上。
  4. 它需要分布式并且能够扩展(例如,如果我添加一个工作人员)

基本上它是一个单进程 FIFO 队列,这正是我在尝试不同的消息队列软件(如 ActiveMQ 或 RabbitMQ)时想要的,但是一旦我将其扩展到 2 个工作线程,它就不起作用了,因为在这种情况下我希望它能够扩展并保持与单进程队列完全相同的功能。下面我附上它如何在具有多个工作人员的分布式环境中工作的描述。

拓扑结构示例:(请注意,队列工作线程之间是多对多关系)

Distributed FIFO Queue

如何运行的示例:

+------+-----------------+-----------------+-----------------+
| Step | Worker 1        | Worker 2        | Worker 3        |
+------+-----------------+-----------------+-----------------+
| 1    | Fetch Q/1/Job/1 | Fetch Q/2/Job/1 | Waiting         |
+------+-----------------+-----------------+-----------------+
| 2    | Running         | Running         | Waiting         |
+------+-----------------+-----------------+-----------------+
| 3    | Running         | Done Q/2/Job/1  | Fetch Q/2/Job/2 |
+------+-----------------+-----------------+-----------------+
| 4    | Done Q/1/Job/1  | Fetch Q/1/Job/2 | Running         |
+------+-----------------+-----------------+-----------------+
| 5    | Waiting         | Running         | Running         |
+------+-----------------+-----------------+-----------------+

这可能不是最好的表示,但它表明,即使在队列 1队列 2 中,也有更多作业,但 Worker 3 确实有更多作业在上一个作业完成之前才开始获取下一个作业。

这就是我努力寻找好的解决方案。

我已经尝试了很多其他解决方案,例如rabbitMQ、activeMQ、apollo...这些允许我创建数千个队列,但是当我尝试时,所有这些都将使用worker 3来运行队列中的下一个作业。并发数是每个工作人员

是否有任何解决方案可以在任何 MQ 平台(例如 ActiveMQ、RabbitMQ、ZeroMQ 等)中实现这一点?

谢谢:)

最佳答案

您可以使用带有附加"dispatch"队列的 Redis 列表来实现此目的,所有工作人员 BRPOP 都在该队列上执行其作业。调度队列中的每个作业都标有原始队列 ID,当工作线程完成作业后,它会转到此原始队列并对调度队列执行 RPOPLPUSH 以使下一个作业可用于任何任务。其他 worker 。因此,调度队列最多有 num_queues 个元素。

您必须处理的一件事是当源队列为空时调度队列的初始填充。这可能只是发布者针对最初设置的每个队列的“空”标志进行的检查,并且当原始队列中没有剩余内容可供调度时也由工作人员设置。如果设置了此标志,发布者可以将第一个作业直接 LPUSH 到调度队列中。

关于redis - 如何在任何 MQ 平台上实现这个单一并发分布式队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41979438/

相关文章:

javascript - 两个队列,一个并发1,另一个并发3

algorithm - 为什么我们要从队列中删除s?

SQL Server 服务代理 - 消息超时

python - 为什么使用 Celery 而不是 RabbitMQ?

perl - 如何避免存储过多的 session ?

c# - ServiceStack.Redis : PooledRedisClientManager creating way too many connections

Java Maze Solver - 我从来没有这么卡过

python - 是否可以清空 Gearman 服务器上的作业队列

python - 将 Celery 任务从 Redis 迁移到 RabbitMQ

redis - 实例化新的 redis-server (Linux)