python - celery 弦不等待子任务(一组链)

标签 python celery chain chord

我的现实情况是,我想从 api 调用获取事件列表,并为每个事件触发一系列函数。一旦所有链完成,我需要调用一个函数来报告结果。

我尝试尽可能简化它并具有以下代码。它会运行,但在链完成之前调用和弦解锁函数。在此代码中,这意味着它无法对结果数组求和。

import time

from celery import Celery, chain, chord, group

app = Celery('tasks', broker='amqp://guest@localhost//', backend='amqp')


@app.task
def generate():
    return [1, 2, 3, 4, 5]


@app.task
def dmap(it, first, second):
    chains = []
    for arg in it:
        c = chain(first.clone([arg, ]), second)
        chains.append(c)

    return group(chains)()


@app.task
def add(x, y):
    print 'add {x} {y}'.format(x=x, y=y)
    time.sleep(3)
    return x + y


@app.task
def mul(x, y):
    print 'mul {x} {y}'.format(x=x, y=y)
    time.sleep(2)
    return x * y


@app.task
def xsum(numbers):

    print numbers
    to_sum = []
    for x in numbers[0]:
        to_sum.append(x.result)
    print to_sum

    return sum(to_sum)

if __name__ == '__main__':

    x = add.s(0)
    y = mul.s(1)

    workers = generate.si() | dmap.s(x, y)

    result = chord(workers)(xsum.s())
    print result.get()

dmap 函数基于 this answer 。我也见过 this answer 。最后一个链接意味着我想做的事情可能是不可能的,因为“当组并行发生时,没有任何东西可以同步。”

generate 函数返回数组而不是单个项目时,我无法弄清楚如何使解决方案正常工作。

运行上述代码的日志显示(早期?)和弦解锁,因此 xsum 尝试对一组结果求和,其中 3 个为 None

[2014-11-11 14:03:10,308: INFO/MainProcess] Received task: tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51]
[2014-11-11 14:03:10,311: INFO/MainProcess] Received task: celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] eta:[2014-11-11 14:03:11.307477+00:00]
[2014-11-11 14:03:10,338: INFO/MainProcess] Received task: tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18]
[2014-11-11 14:03:10,365: INFO/MainProcess] Task tasks.generate[2eedc847-ff67-4e0c-90e1-48314133bb51] succeeded in 0.0523488249746s: [1, 2, 3, 4, 5]
[2014-11-11 14:03:10,386: INFO/MainProcess] Received task: tasks.add[eccf5faa-069c-4634-826e-af5793a11c68]
[2014-11-11 14:03:10,388: WARNING/Worker-2] add 1 0
[2014-11-11 14:03:10,390: INFO/MainProcess] Received task: tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961]
[2014-11-11 14:03:10,392: WARNING/Worker-1] add 2 0
[2014-11-11 14:03:10,394: INFO/MainProcess] Received task: tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52]
[2014-11-11 14:03:10,397: INFO/MainProcess] Received task: tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c]
[2014-11-11 14:03:10,398: INFO/MainProcess] Received task: tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d]
[2014-11-11 14:03:10,399: WARNING/Worker-4] add 3 0
[2014-11-11 14:03:10,401: INFO/MainProcess] Task tasks.dmap[0f2efa72-402d-412e-807e-bbf191850c18] succeeded in 0.061700456019s: <GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195,...
[2014-11-11 14:03:10,402: WARNING/Worker-3] add 4 0
[2014-11-11 14:03:13,409: INFO/MainProcess] Received task: tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344]
[2014-11-11 14:03:13,410: INFO/MainProcess] Received task: tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b]
[2014-11-11 14:03:13,418: INFO/MainProcess] Received task: tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d]
[2014-11-11 14:03:13,419: INFO/MainProcess] Received task: tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195]
[2014-11-11 14:03:13,436: INFO/MainProcess] Task tasks.add[d74659b0-b512-44f9-88b4-1908f79bfc52] succeeded in 3.03667491797s: 3
[2014-11-11 14:03:13,437: INFO/MainProcess] Task tasks.add[e9b3336f-9b37-4f25-81a7-cbac819da38c] succeeded in 3.03460178198s: 4
[2014-11-11 14:03:13,438: INFO/MainProcess] Task tasks.add[6b66167b-2767-4bde-a0a0-32f5fab7a961] succeeded in 3.04608612298s: 2
[2014-11-11 14:03:13,439: WARNING/Worker-4] mul 4 1
[2014-11-11 14:03:13,450: WARNING/Worker-2] add 5 0
[2014-11-11 14:03:13,452: INFO/MainProcess] Task tasks.add[eccf5faa-069c-4634-826e-af5793a11c68] succeeded in 3.06420573901s: 1
[2014-11-11 14:03:13,454: WARNING/Worker-3] mul 3 1
[2014-11-11 14:03:13,481: INFO/MainProcess] Task celery.chord_unlock[7d07e506-1aae-40e5-bd05-bbc53b286103] succeeded in 0.0413383140112s: None
[2014-11-11 14:03:13,485: INFO/MainProcess] Received task: tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373]
[2014-11-11 14:03:15,470: INFO/MainProcess] Task tasks.mul[f696aa0a-844f-4e81-9722-0693c6e8c344] succeeded in 2.031282346s: 4
[2014-11-11 14:03:15,472: WARNING/Worker-1] mul 1 1
[2014-11-11 14:03:15,477: INFO/MainProcess] Task tasks.mul[538c3c60-67f8-409d-b4ce-bf09184aa03b] succeeded in 2.02354899806s: 3
[2014-11-11 14:03:15,479: WARNING/Worker-4] [<GroupResult: 9a3972ff-0976-46d2-937f-9ea4a1ead56b [925ec9c3-09da-43c1-9b94-c04dbe67f195, 4ffb6d04-0cf2-4300-a0de-bf53acf6662d, 538c3c60-67f8-409d-b4ce-bf09184aa03b, f696aa0a-844f-4e81-9722-0693c6e8c344, 82a6b814-53a5-45f1-a0dc-43885f92eca4]>]
[2014-11-11 14:03:15,555: WARNING/Worker-4] [None, None, 3, 4, None]
[2014-11-11 14:03:15,564: ERROR/MainProcess] Task tasks.xsum[575f5375-bf0f-4d41-b9a3-57661eaf4373] raised unexpected: TypeError("unsupported operand type(s) for +: 'int' and 'NoneType'",)
Traceback (most recent call last):
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/duncan/VEnvs/adwords/local/lib/python2.7/site-packages/celery/app/trace.py", line 437, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/duncan/projects/celerychordtest/tasks.py", line 47, in xsum
    return sum(to_sum)
TypeError: unsupported operand type(s) for +: 'int' and 'NoneType'
[2014-11-11 14:03:16,460: INFO/MainProcess] Received task: tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4]
[2014-11-11 14:03:16,462: WARNING/Worker-3] mul 5 1
[2014-11-11 14:03:16,476: WARNING/Worker-2] mul 2 1
[2014-11-11 14:03:16,476: INFO/MainProcess] Task tasks.add[63b2ce22-1288-4cac-9018-8ddefaab575d] succeeded in 3.02716274199s: 5
[2014-11-11 14:03:17,480: INFO/MainProcess] Task tasks.mul[925ec9c3-09da-43c1-9b94-c04dbe67f195] succeeded in 2.00813938997s: 1
[2014-11-11 14:03:18,485: INFO/MainProcess] Task tasks.mul[4ffb6d04-0cf2-4300-a0de-bf53acf6662d] succeeded in 2.00837794197s: 2
[2014-11-11 14:03:18,471: INFO/MainProcess] Task tasks.mul[82a6b814-53a5-45f1-a0dc-43885f92eca4] succeeded in 2.009012155s: 5

我希望/期待该过程等到每个链完成后再调用和弦解锁。

最佳答案

就像@ChillarAnand 建议我最终重新设计我的任务一样,但我这样做是为了消除对和弦的需要。我希望能够拥有一组链条,这意味着我无法(据我所知)将其与和弦结合起来。

我现在要做的是触发“最终”任务,作为触发链组的一部分。为了完成这项工作,最终任务必须检查其他任务是否已完成。因为我知道我的最后一个任务(在我的现实世界程序中)写入数据库,所以我可以简单地检查数据库中是否为生成的每个项目都有一行。

对于任何面临类似问题的人来说,最终函数的相关部分大致如下所示:

class NotReady(Exception):
    pass

@shared_task(default_retry_delay=30, max_retries=10)
def output(generated_list):

    list_from_db = query db ...
    try:
        raise_if_not_equal(list_from_db, generated_list)
    except NotReady, exc:
        raise current.retry(exc=exc, countdown=30)

    ... everything is ready do stuff ...

FWIW:我可能会根据 following thread 更新重试以退避代码。

这感觉像是一个很好的答案,最重要的是,因为这个任务会引发异常,所以我从来没有让工作人员轮询以查明一切是否已完成。

关于python - celery 弦不等待子任务(一组链),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26866881/

相关文章:

javascript - 使用LoDash链和 map 创建组件。 .map 中的 QuoteSlides 未定义

uml - 有条件转移的有限状态机可以表示为马尔可夫链吗?

python - 如何使用python直播股票价格

python - 在实时系统中使用 sklearn DictVectorizer

python - 如何将一组参数作为一个长变量传递给 find()/find_all()

rabbitmq - Celery rpc 与 amqp 结果后端

concurrency - 将 celery 并发设置为每个队列 1 个 worker

python - 使用 Celery Canvas,chord() 和 chain(group(), task) 有什么区别

javascript - jQuery 链接自定义函数

python - 如何根据键从其他数据帧中提取值并将其设置为当前数据帧