问题
我使用 celery 启动如下所示的任务集:
- 我执行了一批可以并行运行的任务,这批任务的数量从数十到数千不等。
我将这些任务的结果聚合成一个答案,然后用这个答案做一些事情——比如存储到数据库,保存到特殊的结果文件等等。基本上,在任务执行完成后,我必须调用具有以下签名的函数:
def callback(result_file_name, task_result_list): #store in file def callback(entity_key, task_result_list): #store in db
目前,第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:
tasks = []
# add taksks to tasks list
task_group = group()
task_group.tasks = tasks
result = task_group.apply_async()
res = result.join()
# Aggregate results
# Save results to file, database whatever
这种方法很麻烦,因为我必须停止单个线程,直到执行所有任务(这可能需要几个小时)。
我也想以某种方式将步骤 2 移至 celery --- 本质上我需要向整个任务集添加一个回调(据我所知 Celery 不支持它)或提交一个在所有这些之后执行的任务子任务。
有人知道该怎么做吗?我在 django 环境中使用它,这样我就可以在数据库中存储一些状态。
总结我最近的发现
和弦不行
我不能直接使用和弦,因为和弦使我能够创建看起来像这样的回调:
def callback(task_result_list):
#store in file
没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。
使用数据库
我可以使用TaskSetMeta
存储结果,但这个实体没有状态字段——所以即使我向TaskSetMeta添加信号,我也必须池化任务结果,这可能会产生巨大的开销。
最佳答案
答案非常简单,我确实可以使用和弦 --- 并且附加参数(如报告文件名等)必须作为 kwargs 传递。
这是和弦任务:
@task
def print_and_sum(to_sum, file_name):
print file_name
print sum(to_sum)
return file_name, sum(to_sum)
以下是实例化它的方法:
subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))
关于django - 我应该如何在 celery 中实现任务集的回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10767691/