基于这些示例:
https://blog.balthazar-rouberol.com/celery-best-practices
https://shulhi.com/2015/10/13/class-based-celery-task/
Class based Task in django celery
我想创建类似的东西:
class CalculationWorker(Task):
def __init__(self, id_user):
self._id_user = id_user
self._user = get_object_or_404(User, pk=self._id_user)
# I need to understand if the bind work or if it's needed
def _bind(self, app):
return super(self.__class__, self).bind(celery_app)
def _retrieve_some_task(self):
# long calculation
def _long_run_task(self):
# long calculation
self._retrieve_some_task()
# Main entry
def run(self):
self._long_run_task()
#
def run_job():
worker = CalculationWorker(id_user=323232)
task = worker.apply_async()
文档似乎说这是可能的(无论如何我不清楚)http://docs.celeryproject.org/en/latest/userguide/tasks.html#custom-task-classes 。它甚至说:
“”“ 这意味着每个进程只会调用一次 init 构造函数,并且任务类在语义上更接近 Actor。 “”
但是http://docs.celeryproject.org/en/latest/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks明确地说:“最佳实践是仅使用自定义任务类来覆盖一般行为,然后使用任务装饰器来实现任务”。
结果我得到了一个 NotRegistered 异常,因为 https://github.com/celery/celery/issues/3548 ,但添加 app.tasks.register(CalculationWorker())
并没有解决问题。
我正在使用 Django 1.10.X 和 Celery 4.0.0
这种方法仍然有效吗?
谢谢
最佳答案
如果您使用 celery-4.0.1,那么您应该检查 chris 1
指出的文档 docs
The Task class is no longer using a special meta-class that automatically registers the task in the task registry.
现在你应该像这样注册你的任务
class CustomTask(Task):
def run(self):
print('running')
app.register_task(CustomTask())
关于django - celery 4.0.0 和基于类的任务工作流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40551478/