python - 收到类型 xx 的未注册任务。该消息已被忽略并丢弃

标签 python django celery

我在 ubuntu EC2 节点上有一个 django 项目,该项目执行计算密集型的长时间运行过程,通常需要 60 秒以上。我需要缓存结果。我一直在读http://www.caktusgroup.com/blog/2014/06/23/scheduling-tasks-celery/http://michal.karzynski.pl/blog/2014/05/18/setting-up-an-asynchronous-task-queue-for-django-using-celery-redis/与文档一起。我已经能够在命令行上完成基本任务,但现在我正尝试将其作为 django 脚本运行。

现在我的 django tp1 View 中的代码结构是:

from django.shortcuts import render
from django.http import HttpResponse
from django.views.decorators.csrf import csrf_exempt
from __future__ import absolute_import
from celery import shared_task

@csrf_exempt
def index(request):

    token = str(request.POST.get('token', False))
    calculator(token)
    return HttpResponse(token)

@shared_task
def calculator(token):

    # do calculation
    # store result in cache

    return

在命令行中我运行:

(env1)ubuntu@ip-172-31-22-65:~/projects/tp$ celery --app=tp.celery:app worker --loglevel=INFO

在我收到的消息末尾:

[2015-03-24 19:49:47,045: ERROR/MainProcess] Received unregistered task of type 'tp1.views.calculator'.
The message has been ignored and discarded.    Did you remember to import the module containing this task? Or maybe you are using relative imports?

我的任务.py:

from __future__ import absolute_import    

from celery import shared_task    

@shared_task
def test(param):
    return 'The test task executed with argument "%s" ' % param

我怎样才能让它工作?

最佳答案

首先,如果您使用自动发现,Celery 可能无法自动发现您的tasks.py 文件。你配置了吗according to the docs通过为其创建一个 celery_app 文件来自动发现您的任务:

# project/celery_app.py
from __future__ import absolute_import

import os

from celery import Celery

from django.conf import settings

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('project_name')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
# YOUR APPLICATION MUST BE LISTED IN settings.INSTALLED_APPS for this to work
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

此外,如果您依赖于autodiscover,它通常无法找到位于不名为tasks.py的模块中的内容。

因此,除了检查您的配置之外,我还会尝试将 views.py 中的任务移动到 tasks.py:

 # project/app_name/tasks.py
 from celery_app import app
 @app.task()
 def calculator(token):

    # do calculation
    # store result in cache

    return

最后,您可以从 tasks.py 导入此函数并在 View 中调用它。

希望有帮助。

关于python - 收到类型 xx 的未注册任务。该消息已被忽略并丢弃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29241900/

相关文章:

django - 根据同一表单中另一个选择字段的值过滤管理表单中的 django 选择字段

mysql - Django IntegrityError 外键重复条目

python - 禁用 Celery 的 Django 调试

Django-allauth OAuth2 刷新 token

python - 为什么它找不到我的 celery 配置文件?

rest - 使用 Celery 或 RESTful API 服务深度学习模型?

python - 将 -inf 替换为零值

python - 如何处理来自谷歌地图 api 的错误?

python - REDIS:python 中的 redis 不返回任何内容

python - 在 virtualenv 中找不到 Atom.core