python - Celery - 无序执行的链式任务

标签 python celery django-celery

我正在向我的 celery worker 发送一系列三个任务。第一个和第三个被添加到队列“filestore”,由工作线程 A 提供服务。第二个被添加到队列“cloud”,由工作线程 B 提供服务。

我想要的行为是三个任务按顺序执行,一个接一个。

我看到的行为是工作人员 A 执行任务 1,然后执行任务 3,然后工作人员 B 执行任务 2。

result = app.send_task(
                        "workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
                        chain=[
                            Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
                            Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
                        ]
)

我做错了什么?

最佳答案

您是否尝试使用 celery 中的链类?

from celery import chain, Signature

chained_tasks = chain([
    Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
    Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
    Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])

result = chained_tasks.apply_async()

关于python - Celery - 无序执行的链式任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56461447/

相关文章:

python - 为什么错误比脚本本身花费的时间更长?

python - celery - errno 111 连接被拒绝

Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

python - 从操作系统的角度理解 python 扭曲的异步性

python - 在二维数组中随机放置n个元素

python - Celery Daemon 在 Centos 7 上不工作

python - django celery 中的定期任务无法正常工作

python - celery - 链接组和子任务。 -> 乱序执行

python - Django 模型继承覆盖字段属性中使用的变量

python - CELERYBEAT_SCHEDULE 在你的项目中放在哪里?