python - 如何在 Django 设置中配置 CELERYBEAT_SCHEDULE?

标签 python django celery periodic-task

我可以让它作为独立应用程序运行,但我无法让它在 Django 中运行。

这是独立的代码:

from celery import Celery
from celery.schedules import crontab


app = Celery('tasks')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='US/Central',
    CELERY_ENABLE_UTC=True,
    CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'tasks.test',
        'schedule': crontab(),
        },
    }
)

@app.task
def test():
    with open('test.txt', 'a') as f:
        f.write('Hello, World!\n')`

它每分钟向 Rabbitmq 服务器提供数据并写入文件。它就像一个魅力,但是当我试图让它在 Django 中工作时,我得到了这个错误:

Did you remember to import the module containing this task? Or maybe you are using relative imports? Please see ____ for more information.

消息正文的完整内容是:{'retries': 0, 'eta': None, 'kwargs':{},'taskset':无,'timelimit':[无,无],'回调': 无,'任务':'proj.test','args':[],'过期':无,'id': '501ca998-b5eb-4ba4-98a8-afabda9e88dd', 'utc': True, 'errbacks': None, 'chord':无}(246b)追溯(最近一次通话):文件 “/home/user/CeleryDjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py”, 第 456 行,在 on_task_received strategies[name](message, body, KeyError: 'proj.test' [2016-06-16 01:16:00,051: INFO/Beat] Scheduler: 发送到期任务测试 (proj.test) [2016-06-16 01:16:00,055: ERROR/MainProcess] 收到未注册 “proj.test”类型的任务。

这是我在 Django 中的代码:

# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'proj.test',
        'schedule': crontab(),
    }
}

celery .py

from __future__ import absolute_import

import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa

app = Celery('proj')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

任务.py

from __future__ import absolute_import
from celery import shared_task


@shared_task
def test():
    with open('test.txt', 'w') as f:
        print('Hello, World', file=f)

初始化文件

from __future__ import absolute_import

from .celery import app as celery_app 

非常感谢对此有任何想法。谢谢。

最佳答案

你为什么不试试下面的方法,让我知道它是否适合你。它对我有用。

在 settings.py 中

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

在 tasks.py..

from celery.task import task # notice the import of task and not shared task. 

@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

但是如果你正在寻找 shared_task 那么..

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

我对异步作业使用共享任务..所以我需要从如下函数调用它..

在您的项目应用程序中的 views.py/或 anywhere.py 中

def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

以防万一,如果您忘记了,请记住包含您尝试运行任务的应用程序。

INSTALLED_APPS = [...
 'my_app'...
] # include app

我确信这种方法工作正常..谢谢

关于python - 如何在 Django 设置中配置 CELERYBEAT_SCHEDULE?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37848481/

相关文章:

django - PyCharm + Python 3.6 + Django + 调试 + 生成器 == 痛苦的世界

python - 使用 cython 并行化 python 循环 numpy.searchsorted

python - 如何访问 Pandas 系列中的最后一个元素

Django 中的 Javascript 驱动表单

python - 报告用户 Django 错误报告

python - Celery send_task 不发送任务

Django - 是否应该始终通过任务处理程序(例如 Celery)发出外部 API 请求?

python - Celery为每个任务实例安排了带有到期时间的任务?

python - 可以根据 fixture 值跳过测试吗?

python - 如何将一串数字转换为 Python 中的列表?