python - 如何递归地将返回列表的 Celery 任务链接到组中?

标签 python celery

我是从这个问题开始的:How to chain a Celery task that returns a list into a group?

但我想扩展两次。所以在我的用例中我有:

  • 任务 A:确定给定日期的项目总数
  • 任务 B:下载该日期的 1000 个元数据条目
  • 任务 C:下载一项的内容

因此,每一步我都会扩展下一步的项目数量。我可以通过循环遍历任务中的结果并在下一个任务函数上调用 .delay() 来完成此操作。但我想我应该尽量不让我的主要任务这样做。相反,它们会返回一个元组列表 - 然后每个元组将扩展为调用下一个函数的参数。

上述问题的答案似乎可以满足我的需求,但我无法找出将其链接以进行两级扩展的正确方法。

这是我的代码的一个非常精简的示例:

from celery import group
from celery.task import subtask
from celery.utils.log import get_task_logger

from .celery import app

logger = get_task_logger(__name__)

@app.task
def task_range(upper=10):
    # wrap in list to make JSON serializer work
    return list(zip(range(upper), range(upper)))

@app.task
def add(x, y):
    logger.info(f'x is {x} and y is {y}')
    char = chr(ord('a') + x)
    char2 = chr(ord('a') + x*2)
    result = x + y
    logger.info(f'result is {result}')
    return list(zip(char * result, char2 * result))

@app.task
def combine_log(c1, c2):
    logger.info(f'combine log is {c1}{c2}')

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    logger.info(f'in dmap, len iter: {len(args_iter)}')
    callback = subtask(celery_task)
    run_in_parallel = group(callback.clone(args) for args in args_iter)
    return run_in_parallel.delay()

然后我尝试了各种方法来使我的嵌套映射工作。首先,一级映射工作正常,因此:

pp = (task_range.s() | dmap.s(add.s()))
pp(2)

产生了我期望的结果,所以我并没有完全放弃。

但是当我尝试添加另一个级别时:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

然后在工作人员中我看到错误:

[2019-11-23 22:34:12,024: ERROR/ForkPoolWorker-2] Task proj.tasks.dmap[e92877a9-85ce-4f16-88e3-d6889bc27867] raised unexpected: TypeError("add() missing 2 required positional arguments: 'x' and 'y'",)
Traceback (most recent call last):
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/trace.py", line 648, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/hdowner/dev/playground/celery/proj/tasks.py", line 44, in dmap
    return run_in_parallel.delay()
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 186, in delay
    return self.apply_async(partial_args, partial_kwargs)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1008, in apply_async
    args=args, kwargs=kwargs, **options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 1092, in _apply_tasks
    **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 578, in apply_async
    dict(self.options, **options) if options else self.options))
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 607, in run
    first_task.apply_async(**options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/canvas.py", line 229, in apply_async
    return _apply(args, kwargs, **options)
  File "/home/hdowner/.venv/play_celery/lib/python3.6/site-packages/celery/app/task.py", line 532, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() missing 2 required positional arguments: 'x' and 'y'

而且我不确定为什么将 dmap() 的参数从普通任务签名更改为链会改变参数传递到 add() 的方式。我的印象是它不应该,它只是意味着 add() 的返回值将被传递。但显然事实并非如此......

最佳答案

事实证明,问题在于 chain 实例上的 clone() 方法在某些时候不会传递参数 - 请参阅 https://stackoverflow.com/a/53442344/3189了解完整详情。如果我使用该答案中的方法,我的 dmap() 代码将变为:

@app.task
def dmap(args_iter, celery_task):
    """
    Takes an iterator of argument tuples and queues them up for celery to run with the function.
    """
    callback = subtask(celery_task)
    run_in_parallel = group(clone_signature(callback, args) for args in args_iter)
    return run_in_parallel.delay()


def clone_signature(sig, args=(), kwargs=(), **opts):
    """
    Turns out that a chain clone() does not copy the arguments properly - this
    clone does.
    From: https://stackoverflow.com/a/53442344/3189
    """
    if sig.subtask_type and sig.subtask_type != "chain":
        raise NotImplementedError(
            "Cloning only supported for Tasks and chains, not {}".format(sig.subtask_type)
        )
    clone = sig.clone()
    if hasattr(clone, "tasks"):
        task_to_apply_args_to = clone.tasks[0]
    else:
        task_to_apply_args_to = clone
    args, kwargs, opts = task_to_apply_args_to._merge(args=args, kwargs=kwargs, options=opts)
    task_to_apply_args_to.update(args=args, kwargs=kwargs, options=deepcopy(opts))
    return clone

然后当我这样做时:

ppp = (task_range.s() | dmap.s(add.s() | dmap.s(combine_log.s())))

一切都按预期进行。

关于python - 如何递归地将返回列表的 Celery 任务链接到组中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59013002/

相关文章:

python - 带有 CELERY_ROUTES 的 celery 任务路由不适用于任务子类

python - 从 celery 任务中获取 celery worker 的名字?

python - ElasticSearch 的 Celery 结果类型

python - 如何迭代参数

python - 如何使用 Python Celery 在不同的服务器上执行任务?

python - 我们可以在调用时设置 AWS python lambda 的环境变量吗

python - PyTorch - 按元素签名的最小值/最大值?

python - Celery 4 不自动发现任务

Python 名称修改

python - LambdaType 与函数类型