python - 注册基于类的任务

标签 python celery celery-task

我使用的是 Celery 版本 4.0.2。

与以前版本的 Celery 相比,基于类的任务似乎不会自动注册(即,如果您配置了自动发现)。

但是,我什至没有实现手动注册基于类的任务。

根据 Celery 更改日志:

http://docs.celeryproject.org/en/latest/changelog.html#version-4-0-1

从版本 4.0.1 开始,应该可以手动注册任务:

from celery import Celery, Task
app = Celery()

class CustomTask(Task):

    def run(self):
        return 'hello'

app.register_task(CustomTask())

但这似乎不起作用。有谁知道如何实现这一目标?

我尝试了一些正在讨论的建议(除了集成 https://github.com/celery/celery/issues/3744 中提到的自定义任务加载器):

Register Celery Class-based Task

https://github.com/celery/celery/issues/3615

https://github.com/celery/celery/issues/3744

最佳答案

快到了!您需要对注册的任务调用delay()

这会起作用:

from celery import Celery, Task

app = Celery()


class CustomTask(Task):
    def run(self):
        return 'hello'


task = CustomTask()
app.register_task(task)

task.delay()

关于python - 注册基于类的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44900235/

相关文章:

python - 将函数应用于可以返回多行的pandas DataFrame

python - ElementTree findall 'or' 运算符

python - 允许 sudo 访问 celery worker

python - Celery - 如何在工作人员关闭后更新任务状态?

django - 从前端使用 AJAX 查询 Celery 以了解创建的任务是否完成的最佳方法?

python - 将 "key : value"形式的字符串转换为列表或字典并提取值

python - 将 PyTorch 与 Celery 结合使用

Python SSL 连接 "EOF occurred in violation of protocol"

python - 与 Celery 任务共享 Pyramid 的数据库 session

python - 将对象保存到数据库时出现问题,因为模型包含 jsonfield sqlite3 django