python - 返回链中组的 celery 任务

标签 python task celery chain

我正在尝试将 celery 任务设置为 Django 站点的一部分,该站点由运行处理管道的多个步骤的 celerybeat 调用。

第一步下载数据文件,第二步绘制这些文件中的数据,第三步将这些图切割成用于谷歌地图的图 block 。

第一步只是一个任务,但第二步和第三步是一组任务,因为每个数据文件都有几十个图。

我想做的是让 celerybeat 调用类似这样的任务

@shared_task
def pipeline():
    job = chain(download.s(), get_plot_task_group().s(), spacer.s(), tile.s())
    result = job.apply_async()
    return result

celery 在平铺之前等待绘图组完成组中的所有任务需要间隔符,如下所述:celery - chaining groups and subtasks. -> out of order execution

但问题是 get_plot_task_group() 需要有关数据文件的信息来构建任务组,但 celery 会立即执行它,而不是在 download.s() 完成后执行。有没有办法告诉 celery 等待调用该函数?

我还尝试让 get_plot_task_group 本身成为一个返回组的任务,但我无法在链中或链完成后调用该组。我必须调用 result.get() 来获取组对象,而在任务中调用 result.get() 是不好的做法。我也看不到使用回调来执行此操作的方法。

如果您有任何见解,我将不胜感激。

tl;dr:我必须在另一个任务中创建一组任务,然后运行它,但我不确定如何做。

最佳答案

将您的小组任务转换为和弦以实现这一目标。这是一个简单的例子。

@app.task
def dummy_task():
    return 'I am a dummy task'

@app.task
def add(x, y):
    return x + y

@app.task
def task_with_group_task():
    task1 = add.si(1, 2)
    group_task = group(add.si(i, i) for i in range(5))
    task2 = chord(group_task, dummy_task.si())
    task3 = add.si(9, 9)

    pipeline = chain(task1, task2, task3)()

这里task2只在task1完成后才开始,task3只在task2完成后才开始。

注意:我不会将一项任务的结果传递给另一项任务。

关于python - 返回链中组的 celery 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27100812/

相关文章:

python - Dropbox webhook,检测已删除的文件

python - 如何将 GEOS MultiLineString 转换为多边形?

django - docker-compose 问题 - Celery 容器无法访问 DB 容器

django - 将 Celery 任务日志直接保存到单个文件

python - 具有均值和计数的条件分组

c++ - 如何枚举/列出 Windows XP 中所有已安装的应用程序?

c# - Java 的 ForkJoinTask<V> 的 .NET 等价物是什么?

c# 检查线程/后台任务是否已完成或需要中止

c# - ContinueWhenAll 不等待所有任务完成

django - 如何在Docker Compose中设置 `celeryd`和 `celerybeat`?