python - 在 Flask-Celery 中收到类型为 ""的未注册任务

标签 python flask celery

我正在尝试将 celery 集成到我的应用程序中,但我收到此错误提示 Received unregistered task of type ""。该消息已被忽略和丢弃。我的 Celery 应用程序实例是这样创建的:

from celery import Celery


def make_celery(app):
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

我的任务文件是这样的:

from flask import current_app
from .. import celery

from ..models.models import MobileRedemption


@celery.task(name='process_new_redemption')
def task_process_new_redemption(red_id):
    redemption = MobileRedemption.objects(id=red_id).first()

    if redemption:
        assert isinstance(redemption, MobileRedemption)
    print ("Redemption Successful.....!")


@celery.task(name='process_delete_redemption')
def task_delete_redemption(red_id):
    current_app.logger.info("reached here!")
    redemption = MobileRedemption.objects(id=red_id).first()
    print(redemption)
    redemption.delete()

我做错了什么?

最佳答案

在您的 Celery 构造中,您应该包含您的任务文件:

celery = Celery(app.import_name, 
                broker=app.config['CELERY_BROKER_URL'],
                include=['path.to.tasks'])

关于python - 在 Flask-Celery 中收到类型为 ""的未注册任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46523635/

相关文章:

python - 在 PyQt 中强制更新 QTableView + QSqlTableModel

python - xml.etree.ElementTree - 设置 xmlns = '...' 时遇到问题

python - Heroku:如何将静态站点生成与 Python/Flask 集成

django - 如何让 django 执行远程 celery 任务?似乎忽略了 settings.py 中的 BROKER_URL

python - 有向线段之间的符号角

Python 提取列表中嵌套 JSON 中的所有键值

opencv - Flask 视频流、多处理、CPU 使用率上限为单核级别

css - 使用 Jinja2 根据单元格值对表格中的单元格进行颜色编码

python - CELERY_IMPORTS、CELERY_RESULT_BACKEND 已弃用。如何使用替代?

python - 使用 Celery 成功执行一个函数后运行另一个任务