我正在尝试将 celery 集成到我的应用程序中,但我收到此错误提示 Received unregistered task of type ""。该消息已被忽略和丢弃。
我的 Celery 应用程序实例是这样创建的:
from celery import Celery
def make_celery(app):
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
我的任务文件是这样的:
from flask import current_app
from .. import celery
from ..models.models import MobileRedemption
@celery.task(name='process_new_redemption')
def task_process_new_redemption(red_id):
redemption = MobileRedemption.objects(id=red_id).first()
if redemption:
assert isinstance(redemption, MobileRedemption)
print ("Redemption Successful.....!")
@celery.task(name='process_delete_redemption')
def task_delete_redemption(red_id):
current_app.logger.info("reached here!")
redemption = MobileRedemption.objects(id=red_id).first()
print(redemption)
redemption.delete()
我做错了什么?
最佳答案
在您的 Celery
构造中,您应该包含
您的任务文件:
celery = Celery(app.import_name,
broker=app.config['CELERY_BROKER_URL'],
include=['path.to.tasks'])
关于python - 在 Flask-Celery 中收到类型为 ""的未注册任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46523635/