我使用的是 Celery 版本 4.0.2。
与以前版本的 Celery 相比,基于类的任务似乎不会自动注册(即,如果您配置了自动发现)。
但是,我什至没有实现手动注册基于类的任务。
根据 Celery 更改日志:
http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1
从版本 4.0.1 开始,应该可以手动注册任务:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
app.register_task(CustomTask())
但这似乎不起作用。有谁知道如何实现这一目标?
我尝试了一些正在讨论的建议(除了集成 https://github.com/celery/celery/issues/3744 中提到的自定义任务加载器):
Register Celery Class-based Task
最佳答案
快到了!您需要对注册的任务调用delay()
。
这会起作用:
from celery import Celery, Task
app = Celery()
class CustomTask(Task):
def run(self):
return 'hello'
task = CustomTask()
app.register_task(task)
task.delay()
关于python - 注册基于类的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44900235/