python - 在 Celery 中记录任务关系

标签 python celery

如果我的签名包含复杂的子任务结构(例如下面),是否有一种方法可以记录任务之间的所有关系,以便我可以在以后重新创建 DependencyGraph?

例如

complex_task = group(task1 | task2 | group(task3, task4, task5 | task6), task7, task8)

也许我可以捕获所有任务的 ID 及其父任务 ID,并记录它们?我可以用通用的方式做到这一点吗?例如task_postrun 或任务基类中的某些内容?

我的真正目标是,如果其中一个子任务失败,可以轻松地知道它发生在哪里(希望通过以图形方式显示任务结构,例如使用 graphviz )。

最佳答案

您可以根据任务结果生成DependencyGraph

In [4]: task_result = some_complex_task()

In [5]: task_result.parent.parent.graph
Out[5]: 285fa253-fcf8-42ef-8b95-0078897e83e6(1)
            463afec2-5ed4-4036-b22d-ba067ec64f52(0)
        872c3995-6fa0-46ca-98c2-5a19155afcf0(2)
            285fa253-fcf8-42ef-8b95-0078897e83e6(1)
            463afec2-5ed4-4036-b22d-ba067ec64f52(0)

您可以将这些图表转换为点文件:

In[22]:  with open('graph.dot', 'w') as fh:
   ...:     res.parent.parent.graph.to_dot(fh)

有关更多信息,请查看documentation .

示例 让我们打印任务的任务名称。

task_list.py:

from celery import Celery, chain
celery_app = Celery('my_tasks', broker='amqp://', backend='amqp://')

@celery_app.task()
def add(x, y):
    return x + y

@celery_app.task()
def sub(x, y):
    return x - y

c = chain(add.s(3, 4), sub.s(2))

任务 c 的子任务名称:

In [62]: r=c()

In [63]: r.parent.graph
Out[63]: 
d11c0076-a4e4-4e84-b26b-9b689860baa5(0)
68ba78cf-7e6c-4735-9173-2349da541b28(1)
     d11c0076-a4e4-4e84-b26b-9b689860baa5(0)

In [64]: r.graph
Out[64]: d11c0076-a4e4-4e84-b26b-9b689860baa5(0)

In [65]: while r:
   ....:     print(r.task_name)
   ....:     r = r.parent
   ....:     
task_list.sub
task_list.add

关于python - 在 Celery 中记录任务关系,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25989477/

相关文章:

redis - 我怎样才能得到周期性任务调度的结果

python - 为什么我得到 '_SIGCHLDWaker' object has no attribute 'doWrite' in Scrapy?

python - 在 Python 中从纬度/经度多边形计算面积

python - 在 Ubuntu 中编译 Python 2.6.6 并需要外部包 wxPython、setuptools 等...

python - 通过 TCP 使用 Python 连接 Teltonika 设备

python - celery ,用倒计时调用延迟

python - Pycryptodome RSA 解密导致大规模性能降级 (RPS)

python - Pandas:在数据帧之间复制一些值

python - 如何等待 celery 生产者 celery 工作人员完成其任务

python - 我需要一个 django-celery 守护进程来监听特定的rabbitmq channel