我使用 Flower 来监控我的 Celery 任务。
我正在尝试更改任务的显示方式(在任务选项卡下),以使列表看起来更有“组织性”。例如,显示<list (6 items)>
而不是[1, 2, 3, ...
.
不幸的是,overriding the format_task
method有局限性:
-
task.args
和task.kwargs
是字符串表示形式,通常会被截断,而不是列表/字典 - 除
task.name
之外的每个字段的 HTML 均进行转义 - 如果函数没有返回值,则在显示任务时会抛出 AJAX 错误
获取原件args
和kwargs
对象返回我正在使用的eval(task.args)
以便我之后可以迭代他们的项目。评估随机字符串对我来说看起来有点不安全,您会推荐比这样做更好的方法吗?
最佳答案
我在 format_task
中找到了获取任务参数的解决方案作为对象(列表/字典等)而不是截断的字符串表示形式。
argsrepr
和kwargsrepr
apply_async()
的参数方法允许为任务参数指定自定义表示。
我创建了一个自定义Task类,覆盖delay
方法如下:
import json
from celery import Task
class FlowerTask(Task):
def delay(self, *args, **kwargs):
argsrepr, kwargsrepr = [], {}
for arg in args:
if isinstance(arg, bytes):
argsrepr.append("<binary content>")
elif isinstance(arg, list):
argsrepr.append("<list ({} items)>".format(len(arg)))
elif isinstance(arg, dict):
argsrepr.append("<dict ({} keys)>".format(len(arg)))
else:
# Format your args the way you prefer
for key, value in kwargs.items():
# Format your kwargs the same way as above
# if ... :
# kwargsrepr.append(...)
# Create the task
new_task = super().s(*args, **kwargs)
# Use our representations as JSON
return new_task.apply_async(
argsrepr=json.dumps(argsrepr),
kwargsrepr=json.dumps(kwargsrepr)
)
然后,我使用 base
使用此类作为我的任务的基础论点:
@shared_task(base=FlowerTask)
def test_task(*args, **kwargs):
return "OK !"
这样,任务的参数表示形式就会存储为 JSON,随后可以在 Flower 的 format_task()
中加载。并将它们用作对象而不是字符串:
def format_task(task):
argsrepr = json.loads(task.args)
kwargsrepr = json.loads(task.kwargs)
if not argsrepr:
task.args = "( )"
else:
task.args = ', '.join(argsrepr)
if not kwargsrepr:
task.kwargs = "( )"
else:
task.kwargs = ', '.join(f'{key} = {value}' for key, value in kwargsrepr.items())
# [...]
这样,参数就会显示如下:
Args:
<list (3 items)>, <binary content>
Kwargs:
callback = <function>, items = <dict (5 items)>
关于python - Flower 中的高级任务格式化(Celery 监控),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53888127/