我想重写 Celery 的 Task 类。我可以重写 on_success 和 on_failure 方法,但是 run 方法对我来说并不那么容易。我尝试使用 bind 方法。我的代码如下:
class MyTask(Task):
def on_success(self, retval, task_id, args, kwargs):
print("success")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("failed")
def bind(self, app):
return super(self.__class__, self).bind(app)
def run(self, *args, **kwargs):
x = kwargs.get('data', None)
x = x**2
if __name__=="__main__":
mytask = MyTask()
app = Celery('mytask', backend='redis', broker='redis://localhost')
mytask.bind(app)
job = mytask.apply_async(data = 1)
但是当我运行代码时出现以下错误:
Received unregistered task of type None.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you're using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
strategy = strategies[type_]
KeyError: None
我搜索了很多,但没有结果。我该如何注册任务?
最佳答案
不需要显式绑定(bind)应用程序,因为 celery 任务在调用 apply_async 时会自动绑定(bind)到 current_app。 如果您想显式绑定(bind),有两种方法可用: 1. 任务.bind(应用程序) 2.从app.Task创建Task类。通过继承app.Task,就会绑定(bind)这个app。
您的问题与绑定(bind)无关。这是关于任务注册表的。更正如下:
from celery import Celery, Task
class MyTask(Task):
name = 'mytask'
def on_success(self, retval, task_id, args, kwargs):
print("success")
def on_failure(self, exc, task_id, args, kwargs, einfo):
print("failed")
def run(self, *args, **kwargs):
x = kwargs.get('data', None)
print(x**2)
if __name__ == "__main__":
app = Celery('mytask', backend='redis', broker='redis://localhost')
MyTask.bind(app)
app.tasks.register(MyTask)
app.worker_main()
希望对您有帮助。
关于python - 如何将 celery 应用程序与 Task 类绑定(bind)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43960320/