python - celery (Redis)结果后端不工作

标签 python django redis celery django-celery

我有一个使用 Django 的 Web 应用程序,并且我正在使用 Celery 进行一些异步任务处理。

对于 Celery,我使用 Rabbitmq 作为代理,使用 Redis 作为结果后端。

Rabbitmq 和 Redis 在本地虚拟机上托管的同一个 Ubuntu 14.04 服务器上运行。

Celery 工作人员正在远程机器上运行 (Windows 10)(没有工作人员在 Django 服务器上运行)。

我有三个问题(我认为它们以某种方式相关!)。

  1. 无论任务成功还是失败,任务都保持在“PENDING”状态。
  2. 任务在失败时不会重试。尝试重试时出现此错误:

reject requeue=False: [WinError 10061] No connection could be made because the target machine actively refused it

  1. 结果后端似乎无法正常工作。

我也对我的设置感到困惑,我不知道这个问题可能来自哪里!

这是我目前的设置:

my_app/settings.py

# region Celery Settings
CELERY_CONCURRENCY = 1
CELERY_ACCEPT_CONTENT = ['json']
# CELERY_RESULT_BACKEND = 'redis://:C@pV@lue2016@cvc.ma:6379/0'
BROKER_URL = 'amqp://soufiaane:C@pV@lue2016@cvc.ma:5672/cvcHost'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1

CELERY_REDIS_HOST = 'cvc.ma'
CELERY_REDIS_PORT = 6379
CELERY_REDIS_DB = 0
CELERY_RESULT_BACKEND = 'redis'
CELERY_RESULT_PASSWORD = "C@pV@lue2016"
REDIS_CONNECT_RETRY = True

AMQP_SERVER = "cvc.ma"
AMQP_PORT = 5672
AMQP_USER = "soufiaane"
AMQP_PASSWORD = "C@pV@lue2016"
AMQP_VHOST = "/cvcHost"
CELERYD_HIJACK_ROOT_LOGGER = True
CELERY_HIJACK_ROOT_LOGGER = True
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
# endregion

my_app/celery_settings.py

from __future__ import absolute_import
from django.conf import settings
from celery import Celery
import django
import os

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'my_app.settings')
django.setup()
app = Celery('CapValue', broker='amqp://soufiaane:C@pV@lue2016@cvc.ma/cvcHost', backend='redis://:C@pV@lue2016@cvc.ma:6379/0')

# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

my_app__init__.py

from __future__ import absolute_import

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.

from .celery_settings import app as celery_app

my_app\email\tasks.py

from __future__ import absolute_import
from my_app.celery_settings import app

# here i only define the task skeleton because i'm executing this task on remote workers !
@app.task(name='email_task', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
    try:
        print("x")
    except Exception as exc:
        self.retry(exc=exc)

在 worker 方面,我有一个文件“tasks.py”,其中包含任务的实际实现:

worker \tasks.py

from __future__ import absolute_import
from celery.utils.log import get_task_logger
from celery import Celery


logger = get_task_logger(__name__)
app = Celery('CapValue', broker='amqp://soufiaane:C@pV@lue2016@cvc.ma/cvcHost', backend='redis://:C@pV@lue2016@cvc.ma:6379/0')

@app.task(name='email_task', bind=True, max_retries=3, default_retry_delay=1)
def email_task(self, job, email):
    try:
        """
        The actual implementation of the task
        """
    except Exception as exc:
        self.retry(exc=exc)

我注意到的是:

  • 当我将工作人员中的代理设置更改为错误密码时,出现无法连接到代理错误。
  • 当我将工作人员中的结果后端设置更改为错误密码时,它会正常运行,就好像一切正​​常。

什么可能导致我出现这些问题?

编辑

在我的 Redis 服务器上,我已经启用了远程连接

/etc/redis/redis.conf

... 绑定(bind) 0.0.0.0 ...

最佳答案

我的猜测是您的问题出在密码上。 您的密码中包含 @,可以解释为 user:passhost 部分之间的分隔符。

工作人员处于待处理状态,因为他们无法正确连接到代理。 来自 celery 的文档 http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

PENDING Task is waiting for execution or unknown. Any task id that is not known is implied to be in the pending state.

关于python - celery (Redis)结果后端不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35539778/

相关文章:

Python Apscheduler 条件任务

python - 'dict' 对象不可调用

python-2.7 - 移动应用聊天解决方案

python - PyTorch 数据集中哪里使用了 len 函数?

python - 在 Python 中追加递归方法的最佳方式是什么?

.net - 通过 HTTP POST 使用 HttpWebRequest 和 HttpWebResponse 从 .NET 进行 Django 身份验证

django social-registration 重定向 url 有时/social/setup 和有时/accounts/profile 。为什么?

java - 关于jredis性能的问题

elasticsearch - 来自 tomcat 集群的日志传送

python - 在不知道索引是否存在的情况下处理访问 json 值的 pythonic 方法是什么?