python - 确保 Celery 中不同来源的任务顺序

标签 python multithreading rabbitmq celery

可能会问你一个关于 Celery 的问题?

我有不同的作者每 X 分钟写一个任务。每个任务都需要完成同一作者的任务。该系统运行良好,X 分钟 >> 几秒钟即可完成任务。

但是,现在,可能会出现作者同时发送两三个任务的情况。很明显,Celery + RabbitMQ会将这个任务分配给不同的worker,造成麻烦。

我已经搜索过,但我发现了关于用锁阻塞一个工作人员直到另一个工作人员完成(例如使用 Redis)的回复,但这是不可能的,因为我的工作人员较少。

我需要为 N 个作者准备 N 个队列,并且 Celery 能够理解每个队列中的顺序。我将有数以千计的写入器,所以我无法创建那么多的 worker。

例子: A B C writers, A1, A2... tasks, and only one worker

我在“同一”时间收到 A1,A2,B1,C1,B2,C2,A3,B3,C3

Celery 应该创建队列 A (1-2-3) 乙 (1-2-3) C (1-2-3)

然后发送任务A1,那么,下一个,是不是A2,B1,C1并不重要,但不应该是A3,B2,B3,C2,C3。

希望我解释得很好

谢谢!

最佳答案

我认为您需要为每个队列创建一个工作人员来执行这样的排序。否则,worker 只会使用先进先出的方法来处理任务。您可以根据需要创建任意数量的队列,并配置每个工作人员从哪些队列接收消息。您可以在启动工作程序时传递 -Q 参数以设置其队列,如 Workers Guide 中所述。 .

celery -A my_project worker -l info -Q A

然后您可以设置全局映射,使用 Routing Guide 定义每个任务进入的队列。 .

CELERY_ROUTES = {
    'my_app.tasks.task_a1': {'queue': 'A'},
    'my_app.tasks.task_a2': {'queue': 'A'},
    'my_app.tasks.task_b1': {'queue': 'B'},
    'my_app.tasks.task_c1': {'queue': 'C'},
}

或者,您可以在提交每个任务实例时根据 Calling Tasks Guide 指定队列.

task_a1.apply_async(queue='A')

关于python - 确保 Celery 中不同来源的任务顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28650146/

相关文章:

python - 调整自适应阈值参数背后的直觉

Python编码格式

RabbitMQ - 在发布者端查找消息已被消费者确认

Symfony2 和 RabbitMqBundle。无法发布消息

python 列表连接

python - 如何在 Windows 上使用 iPython Notebook 解决 pandas 的导入错误?

c++ - std::queue 是否具有事件机制(std::queue 中的信号)

java - 执行者没有运行所有线程。

Java多线程和链表操作

rabbitmq - rabbitmq 心跳是如何工作的