我正在尝试将 celery 任务设置为 Django 站点的一部分,该站点由运行处理管道的多个步骤的 celerybeat 调用。
第一步下载数据文件,第二步绘制这些文件中的数据,第三步将这些图切割成用于谷歌地图的图 block 。
第一步只是一个任务,但第二步和第三步是一组任务,因为每个数据文件都有几十个图。
我想做的是让 celerybeat 调用类似这样的任务
@shared_task
def pipeline():
job = chain(download.s(), get_plot_task_group().s(), spacer.s(), tile.s())
result = job.apply_async()
return result
celery 在平铺之前等待绘图组完成组中的所有任务需要间隔符,如下所述:celery - chaining groups and subtasks. -> out of order execution
但问题是 get_plot_task_group() 需要有关数据文件的信息来构建任务组,但 celery 会立即执行它,而不是在 download.s() 完成后执行。有没有办法告诉 celery 等待调用该函数?
我还尝试让 get_plot_task_group 本身成为一个返回组的任务,但我无法在链中或链完成后调用该组。我必须调用 result.get() 来获取组对象,而在任务中调用 result.get() 是不好的做法。我也看不到使用回调来执行此操作的方法。
如果您有任何见解,我将不胜感激。
tl;dr:我必须在另一个任务中创建一组任务,然后运行它,但我不确定如何做。
最佳答案
将您的小组任务转换为和弦以实现这一目标。这是一个简单的例子。
@app.task
def dummy_task():
return 'I am a dummy task'
@app.task
def add(x, y):
return x + y
@app.task
def task_with_group_task():
task1 = add.si(1, 2)
group_task = group(add.si(i, i) for i in range(5))
task2 = chord(group_task, dummy_task.si())
task3 = add.si(9, 9)
pipeline = chain(task1, task2, task3)()
这里task2只在task1完成后才开始,task3只在task2完成后才开始。
注意:我不会将一项任务的结果传递给另一项任务。
关于python - 返回链中组的 celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27100812/