我正在向我的 celery worker 发送一系列三个任务。第一个和第三个被添加到队列“filestore”,由工作线程 A 提供服务。第二个被添加到队列“cloud”,由工作线程 B 提供服务。
我想要的行为是三个任务按顺序执行,一个接一个。
我看到的行为是工作人员 A 执行任务 1,然后执行任务 3,然后工作人员 B 执行任务 2。
result = app.send_task(
"workerTasks_filestore.task_upload_scan_to_s3", args=[scan.scan_name], queue='filestore',
chain=[
Signature('workerTasks.do_processing_task', args=[scan.scan_name, spd_name], queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=[scan.scan_name], queue='filestore', immutable=True),
]
)
我做错了什么?
最佳答案
您是否尝试使用 celery 中的链类?
from celery import chain, Signature
chained_tasks = chain([
Signature('workerTasks_filestore.task_upload_scan_to_s3', args=(scan.scan_name,), queue='filestore'),
Signature('workerTasks.do_processing_task', args=(scan.scan_name, spd_name,), queue=queue, immutable=True),
Signature('workerTasks_filestore.task_download_scan_from_s3', args=(scan.scan_name,), queue='filestore', immutable=True)
])
result = chained_tasks.apply_async()
关于python - Celery - 无序执行的链式任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56461447/