django - 我应该如何在 celery 中实现任务集的回调

标签 django celery djcelery

问题

我使用 celery 启动如下所示的任务集:

  1. 我执行了一批可以并行运行的任务,这批任务的数量从数十到数千不等。
  2. 我将这些任务的结果聚合成一个答案,然后用这个答案做一些事情——比如存储到数据库,保存到特殊的结果文件等等。基本上,在任务执行完成后,我必须调用具有以下签名的函数:

    def callback(result_file_name, task_result_list): 
        #store in file
    
    
    def callback(entity_key, task_result_list):
        #store in db 
    

目前,第 1 步在 Celery 队列中完成,第 2 步在 celery 外部完成:

    tasks = []

    # add taksks to tasks list 

    task_group = group()
    task_group.tasks = tasks

    result = task_group.apply_async()

    res = result.join()

    # Aggregate results 

    # Save results to file, database whatever

这种方法很麻烦,因为我必须停止单个线程,直到执行所有任务(这可能需要几个小时)。

我也想以某种方式将步骤 2 移至 celery --- 本质上我需要向整个任务集添加一个回调(据我所知 Celery 不支持它)或提交一个在所有这些之后执行的任务子任务。

有人知道该怎么做吗?我在 django 环境中使用它,这样我就可以在数据库中存储一些状态。

总结我最近的发现

和弦不行

我不能直接使用和弦,因为和弦使我能够创建看起来像这样的回调:

    def callback(task_result_list): 
        #store in file

没有明显的方法可以将附加参数传递给回调(特别是因为这些回调不能是本地函数)。

使用数据库

我可以使用TaskSetMeta存储结果,但这个实体没有状态字段——所以即使我向TaskSetMeta添加信号,我也必须池化任务结果,这可能会产生巨大的开销。

最佳答案

答案非常简单,我确实可以使用和弦 --- 并且附加参数(如报告文件名等)必须作为 kwargs 传递。

这是和弦任务:

@task
def print_and_sum(to_sum, file_name):
    print file_name
    print sum(to_sum)
    return file_name, sum(to_sum)

以下是实例化它的方法:

subtasks = [...]
result = chord(subtasks)(print_and_sum.subtask(kwargs={'file_name' : 'report_file.csv'}))

关于django - 我应该如何在 celery 中实现任务集的回调,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10767691/

相关文章:

rabbitmq - 如何在 Celery 中设置每条消息的过期时间(TTL)?

java - 如何用另一种语言(不是 Python)编写 Celery worker/tasks?

http - 如何通过http在 celery 任务调用上设置http_headers

python - 如果其中一项任务失败,Celery 链就会中断

python - Apache 与 mod_wsgi 发出有关 django 中数据库的错误

django - 在一个 html 表单中处理多个模型表单

Django 在模型字段之间使用简单算术查询并与来自另一个模型的字段进行比较

python - 排除优先级较低的重复元素 (Django)

Django Celery beat 在开始时崩溃

python - djcelery、billiard 和 django_settings_module 的异常警告