python - Celery 4.1 周期性任务错误

标签 python django celery celerybeat

我正在尝试设置一个每十秒运行一次的任务。使用 Celery Beat。

我正在使用:

 Django==1.11.3
 celery==4.1.0
 django-celery-beat==1.1.1
 django-celery-results==1.0.1

它给我以下错误:

收到类型为“operations.tasks.message”的未注册任务

我是 Celery 的新手,我尝试了很多解决方案,但似乎找不到解决方案,希望得到帮助

settings.py

CELERY_BROKER_URL = 'pyamqp://guest@localhost//'
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Africa/Johannesburg'
CELERY_BEAT_SCHEDULE = {
        'message': {
        'task': 'operations.tasks.message',
        'schedule': 10.0
    }
    }

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'nodiso.settings')

app = Celery('nodiso')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

__init__.py

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']

任务.py

from __future__ import absolute_import, unicode_literals
from celery import shared_task
from operations import models
from .celery import periodic_task



@task
def message():
    t = models.Celerytest.objects.create(Message='Hello World')
    t.save()

文件结构

proj-
     proj-
         __init__.py
         settings.py-
         celery.py-
     app-
         tasks.py-

最佳答案

在我的 celery.py 文件中,我这样定义 app:

app = Celery(
    'your_celery_app_name',
    include=[
        'your_celery_app_name.module.task1',
        'your_celery_app_name.module.task2',
    ]
)
app.config_from_object('your_celery_app_name.celeryconfig')

我的 celeryconfig.py 是我定义节拍和其他设置的地方(我认为这与您的 settings.py 相同)。

下面可能不相关 - 我不是 Python 专家,也不是应该如何将包放在一起的专家 - 但根据我有限的理解,你的任务应该是你的 celery 应用程序模块的子模块。不过,请将此与少许盐一起服用。

我的项目结构看起来更像这样:

your_celery_app_name (dir)
    setup.py (file)
    your_celery_app_name (dir)
        __init__.py (file)
        celery.py (file)
        celeryconfig.py (file)
        module (dir)
            __init__.py (importing task1 and task2 from tasks)
            tasks.py (implementing task1 and task2)

关于python - Celery 4.1 周期性任务错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48948415/

相关文章:

python - random.randint 函数没有按照我希望的方式工作

python - Bottle 服务器在计算时没有响应

python - Seaborn:来自两个数据框的分组箱线图

python lxml 添加一个保留所有父树的子元素

django - 如何访问 Django-CMS 插件中的请求对象?

django - docker-compose不下载对requirements.txt文件的添加

django - 如何在django-analytical中使用通用的“analytical。*”标签

python - Celery:运行多个异步子任务会导致 "never call results.get()"错误

python - 可以等到 celery 组做好吗?

python - Celery worker 变量共享问题