我遇到了一个问题,我将一个任务放入队列并且它正在运行多次。 从 celery 日志中我可以看到同一个工作人员正在运行任务......
[2014-06-06 15:12:20,731: INFO/MainProcess] Received task: input.tasks.add_queue
[2014-06-06 15:12:20,750: INFO/Worker-2] starting runner..
[2014-06-06 15:12:20,759: INFO/Worker-2] collection started
[2014-06-06 15:13:32,828: INFO/Worker-2] collection complete
[2014-06-06 15:13:32,836: INFO/Worker-2] generation of steps complete
[2014-06-06 15:13:32,836: INFO/Worker-2] update created
[2014-06-06 15:13:33,655: INFO/Worker-2] email sent
[2014-06-06 15:13:33,656: INFO/Worker-2] update created
[2014-06-06 15:13:34,420: INFO/Worker-2] email sent
[2014-06-06 15:13:34,421: INFO/Worker-2] FINISH - Success
但是,当我查看应用程序的实际日志时,它会为每个步骤显示 5-6 行日志(??)。
我将 Django 1.6 与 RabbitMQ 一起使用。放入队列的方法是通过在函数上放置延迟。
这个函数(添加了任务装饰器(然后调用一个运行的类。
有人知道解决此问题的最佳方法吗?
编辑:这里是代码,
views.py
在我看来,我正在通过...将我的数据发送到队列
from input.tasks import add_queue_project
add_queue_project.delay(data)
tasks.py
from celery.decorators import task
@task()
def add_queue_project(data):
""" run project """
logger = logging_setup(app="project")
logger.info("starting project runner..")
f = project_runner(data)
f.main()
class project_runner():
""" main project runner """
def __init__(self,data):
self.data = data
self.logger = logging_setup(app="project")
def self.main(self):
.... Code
settings.py
THIRD_PARTY_APPS = (
'south', # Database migration helpers:
'crispy_forms', # Form layouts
'rest_framework',
'djcelery',
)
import djcelery
djcelery.setup_loader()
BROKER_HOST = "127.0.0.1"
BROKER_PORT = 5672 # default RabbitMQ listening port
BROKER_USER = "test"
BROKER_PASSWORD = "test"
BROKER_VHOST = "test"
CELERY_BACKEND = "amqp" # telling Celery to report the results back to RabbitMQ
CELERY_RESULT_DBURI = ""
CELERY_IMPORTS = ("input.tasks", )
celery
我运行的那行是启动celery,
python2.7 manage.py celeryd -l info
谢谢,
最佳答案
我没有给你一个确切的答案,但有几件事你应该看看:
djcelery
已被弃用,因此如果您使用新版本的celery
可能会出现某种冲突。如果您的
input
应用在INSTALLED_APPS
中列出,celery 会发现它,因此您不需要将它添加到CELERY_IMPORTS = ("input.tasks", )
,这可能是您问题的原因,因为任务可能会被加载多次尝试给你的任务起一个名字
@task(name='input.tasks.add')
,它会知道这是同一个任务,无论你如何导入它。
查看您的设置,您似乎正在使用旧版本的 celery ,或者您正在为新版本的 celery 使用旧配置。在任何情况下,请确保您拥有最新版本并尝试使用此配置而不是您拥有的配置:
BROKER_URL = 'amqp://<user>:<password>@localhost:5672/<vhost>'
CELERY_RESULT_BACKEND = 'amqp'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
现在,您还必须以不同方式配置 celery :
完全摆脱 djcelery
东西。
在你的 django 项目中创建 proj/celery.py
:
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'settings')
app = Celery('proj')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
在你的proj/__init__.py
中:
from __future__ import absolute_import
from proj.celery import app as celery_app
然后,如果您的 input
应用是可重复使用的应用并且不是您项目的一部分,请使用 @shared_task
而不是 @task
装饰器。
然后运行 celery :
celery -A proj worker -l info
希望对您有所帮助。
关于python - 多次运行 Celery/Django 单个任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24085621/