python - 在 Celery 中如何更新主任务的状态直到他的所有子任务完成?

标签 python django celery

在 Celery 中,我正在运行一项主任务,该任务针对从查询中获得的每一项运行一个子任务。子任务应该并行运行。在 UI 上,我有一个进度条,显示总共完成了多少子任务。我正在更新主要任务状态以将信息提供给进度条。我的问题是主任务在将所有子任务推送给代理后立即结束,所以我无法再更新他的状态。我希望主任务可以等到所有子任务都完成。是否可以?还有其他解决方案吗?这是我的伪代码(真实代码不使用全局 ;-))。

total = 0
done = 0

@task(ignore_result=True)
def copy_media(path):
    global total, done
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    for document in documents:
        process_doc.delay(document, path, copy_media)

@task(ignore_result=True)
def process_doc(document, path, copy_media):
    global total, done
    # Do some stuff
    done += 1
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})

最佳答案

我找到了一种使用 TaskSet 的方法。但我并不完全满意,因为我不能忽略子任务的结果。如果我忽略 process_doc 任务 results.ready() 的结果,总是返回 Falseresults.completed_count()总是返回 0,等等。这是代码:

@task(ignore_result=True)
def copy_media(path):
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    documents = Document.objects.all()
    total = documents.count()
    copy_media.update_state(state=STARTED, meta={'total': total, 'done': done})
    job = TaskSet(tasks=[process_doc.subtask((document, path))
                         for document in documents])
    results = job.apply_async()
    doc_name = ''
    while not results.ready():
        done = results.completed_count()
        if done:
            last = done - 1
            for idx in xrange(last, -1, -1):
                if results[idx].ready():
                    doc_name = results[idx].result
                    break
        copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name})
        time.sleep(0.25)

@task()
def process_doc(document, path):
    # Do some stuff
    return document

关于python - 在 Celery 中如何更新主任务的状态直到他的所有子任务完成?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10046486/

相关文章:

Python:使用 xlrd 和 pandas 解析 .xls 文件失败

django - 使用 Django Treebeard 获取祖先时如何防止 N+1 查询?

python - 在 Celery 任务中获取 task_id

django - 我在使用 Celery、Redis 和 Django 时遇到问题

python - 这个类如何在不实现 "__iter__"的情况下实现 "next"方法?

python - 我有 10000 个矢量形式的图像,如何将其转换为我的卷积神经网络?

python - Django 返回属性 if not null else 返回方法结果

django - Celery beat 进程在启动时分配大量内存

python - R sparklyr 包作为 Spark 的前端有多快?

django - 测试 IntegrityError UNIQUE 约束失败