python - 如何将 celery 应用程序与 Task 类绑定(bind)?

标签 python celery celery-task

我想重写 Celery 的 Task 类。我可以重写 on_success 和 on_failure 方法,但是 run 方法对我来说并不那么容易。我尝试使用 bind 方法。我的代码如下:

class MyTask(Task):
    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def bind(self, app):
        return super(self.__class__, self).bind(app)

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        x = x**2


if __name__=="__main__":
     mytask = MyTask()
     app = Celery('mytask', backend='redis', broker='redis://localhost')
     mytask.bind(app)
     job = mytask.apply_async(data = 1)

但是当我运行代码时出现以下错误:

Received unregistered task of type None.
The message has been ignored and discarded.

Did you remember to import the module containing this task?
Or maybe you're using relative imports?

Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.

The full contents of the message body was:
b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
Traceback (most recent call last):
  File "/home/ayandeh/anaconda3/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 559, in on_task_received
    strategy = strategies[type_]
KeyError: None

我搜索了很多,但没有结果。我该如何注册任务?

最佳答案

不需要显式绑定(bind)应用程序,因为 celery 任务在调用 apply_async 时会自动绑定(bind)到 current_app。 如果您想显式绑定(bind),有两种方法可用: 1. 任务.bind(应用程序) 2.从app.Task创建Task类。通过继承app.Task,就会绑定(bind)这个app。

您的问题与绑定(bind)无关。这是关于任务注册表的。更正如下:

from celery import Celery, Task

class MyTask(Task):
    name = 'mytask'

    def on_success(self, retval, task_id, args, kwargs):
        print("success")

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print("failed")

    def run(self, *args, **kwargs):
        x = kwargs.get('data', None)
        print(x**2)


if __name__ == "__main__":
    app = Celery('mytask', backend='redis', broker='redis://localhost')
    MyTask.bind(app)
    app.tasks.register(MyTask)
    app.worker_main()

希望对您有帮助。

关于python - 如何将 celery 应用程序与 Task 类绑定(bind)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43960320/

相关文章:

python - celery 任务状态总是挂起

Django 1.6 + RabbitMQ 3.2.3 + Celery 3.1.9 - 为什么我的 celery 工作人员会死掉并出现 : WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV)

python - 比较 2 个列表的元素并仅打印不同的一个

python - Pandas groupby 并与其他数据框求和

python - 使用机器人框架条件语句和关键字时出现语法错误

python - 使用 celery 提交任务的最快方法?

celery - celery 延迟的阻塞版本?

python - 为什么 RabbitMQ 不在持久队列中持久化消息?

python - Celery AttributeError : async error

python to_replace 字符串不等于一个值?