我有
celery==3.1.23
Django==1.9.1
redis==2.10.5
ii redis-server 2:2.8.19-3 amd64 Persistent key-value database with networ
ii redis-tools 2:2.8.19-3 amd64 Persistent key-value database with networ
我的配置设置有几行
# Celery
BROKER_URL = 'redis://127.0.0.1:6379/0'
BROKER_TRANSPORT = 'redis'
# start worker with '$ celery -A intro worker -l debug'
我的配置文件celery.py(标准做法是这样命名,但在我看来很困惑)是
from __future__ import absolute_import
import os
import django
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'intro.settings')
django.setup()
app = Celery('intro')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
配置文件/etc/default/celeryd(同样令人困惑的命名)是
# copy this file to /etc/default/celeryd
CELERYD_NODES="w1 w2 w3"
VIRTUAL_ENV_PATH="/srv/intro/bin"
# JRT
CELERY_BIN="${VIRTUAL_ENV_PATH}/celery"
# Where to chdir at start.
CELERYD_CHDIR="/srv/intro/intro"
# Python interpreter from environment.
ENV_PYTHON="$VIRTUAL_ENV_PATH/python"
# How to call "manage.py celeryd_multi"
CELERYD_MULTI="$ENV_PYTHON $CELERYD_CHDIR/manage.py celeryd_multi"
# How to call "manage.py celeryctl"
CELERYCTL="$ENV_PYTHON $CELERYD_CHDIR/manage.py celeryctl"
# Extra arguments to celeryd NOTE --beat is vital, otherwise scheduler
# will not run
CELERYD_OPTS="--concurrency=1 --beat"
# %n will be replaced with the nodename.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
# Workers should run as an unprivileged user.
CELERYD_USER="jimmy"
CELERYD_GROUP="jimmy"
# Name of the projects settings module.
export DJANGO_SETTINGS_MODULE="intro.settings"
#CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
#export DJANGO_SETTINGS_MODULE="settings"
#CELERYD_MULTI="/home/webapps/.virtualenvs/crowdstaff/bin/django-admin.py celeryd_detach"
我的/etc/init.d/celeryd 文件是
#!/bin/sh -e
VERSION=10.1
echo "celery init v${VERSION}."
if [ $(id -u) -ne 0 ]; then
echo "Error: This program can only be used by the root user."
echo " Unprivileged users must use the 'celery multi' utility, "
echo " or 'celery worker --detach'."
exit 1
fi
# Can be a runlevel symlink (e.g. S02celeryd)
if [ -L "$0" ]; then
SCRIPT_FILE=$(readlink "$0")
else
SCRIPT_FILE="$0"
fi
SCRIPT_NAME="$(basename "$SCRIPT_FILE")"
DEFAULT_USER="celery"
DEFAULT_PID_FILE="/var/run/celery/%n.pid"
DEFAULT_LOG_FILE="/var/log/celery/%n.log"
DEFAULT_LOG_LEVEL="INFO"
DEFAULT_NODES="celery"
DEFAULT_CELERYD="-m celery worker --detach"
CELERY_DEFAULTS=${CELERY_DEFAULTS:-"/etc/default/${SCRIPT_NAME}"}
# Make sure executable configuration script is owned by root
_config_sanity() {
local path="$1"
local owner=$(ls -ld "$path" | awk '{print $3}')
local iwgrp=$(ls -ld "$path" | cut -b 6)
local iwoth=$(ls -ld "$path" | cut -b 9)
if [ "$(id -u $owner)" != "0" ]; then
echo "Error: Config script '$path' must be owned by root!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with mailicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change ownership of the script:"
echo " $ sudo chown root '$path'"
exit 1
fi
if [ "$iwoth" != "-" ]; then # S_IWOTH
echo "Error: Config script '$path' cannot be writable by others!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
if [ "$iwgrp" != "-" ]; then # S_IWGRP
echo "Error: Config script '$path' cannot be writable by group!"
echo
echo "Resolution:"
echo "Review the file carefully and make sure it has not been "
echo "modified with malicious intent. When sure the "
echo "script is safe to execute with superuser privileges "
echo "you can change the scripts permissions:"
echo " $ sudo chmod 640 '$path'"
exit 1
fi
}
if [ -f "$CELERY_DEFAULTS" ]; then
_config_sanity "$CELERY_DEFAULTS"
echo "Using config script: $CELERY_DEFAULTS"
. "$CELERY_DEFAULTS"
fi
# Sets --app argument for CELERY_BIN
CELERY_APP_ARG=""
if [ ! -z "$CELERY_APP" ]; then
CELERY_APP_ARG="--app=$CELERY_APP"
fi
CELERYD_USER=${CELERYD_USER:-$DEFAULT_USER}
# Set CELERY_CREATE_DIRS to always create log/pid dirs.
CELERY_CREATE_DIRS=${CELERY_CREATE_DIRS:-0}
CELERY_CREATE_RUNDIR=$CELERY_CREATE_DIRS
CELERY_CREATE_LOGDIR=$CELERY_CREATE_DIRS
if [ -z "$CELERYD_PID_FILE" ]; then
CELERYD_PID_FILE="$DEFAULT_PID_FILE"
CELERY_CREATE_RUNDIR=1
fi
if [ -z "$CELERYD_LOG_FILE" ]; then
CELERYD_LOG_FILE="$DEFAULT_LOG_FILE"
CELERY_CREATE_LOGDIR=1
fi
CELERYD_LOG_LEVEL=${CELERYD_LOG_LEVEL:-${CELERYD_LOGLEVEL:-$DEFAULT_LOG_LEVEL}}
CELERY_BIN=${CELERY_BIN:-"celery"}
CELERYD_MULTI=${CELERYD_MULTI:-"$CELERY_BIN multi"}
CELERYD_NODES=${CELERYD_NODES:-$DEFAULT_NODES}
export CELERY_LOADER
if [ -n "$2" ]; then
CELERYD_OPTS="$CELERYD_OPTS $2"
fi
CELERYD_LOG_DIR=`dirname $CELERYD_LOG_FILE`
CELERYD_PID_DIR=`dirname $CELERYD_PID_FILE`
# Extra start-stop-daemon options, like user/group.
if [ -n "$CELERYD_CHDIR" ]; then
DAEMON_OPTS="$DAEMON_OPTS --workdir=$CELERYD_CHDIR"
fi
check_dev_null() {
if [ ! -c /dev/null ]; then
echo "/dev/null is not a character device!"
exit 75 # EX_TEMPFAIL
fi
}
maybe_die() {
if [ $? -ne 0 ]; then
echo "Exiting: $* (errno $?)"
exit 77 # EX_NOPERM
fi
}
create_default_dir() {
if [ ! -d "$1" ]; then
echo "- Creating default directory: '$1'"
mkdir -p "$1"
maybe_die "Couldn't create directory $1"
echo "- Changing permissions of '$1' to 02755"
chmod 02755 "$1"
maybe_die "Couldn't change permissions for $1"
if [ -n "$CELERYD_USER" ]; then
echo "- Changing owner of '$1' to '$CELERYD_USER'"
chown "$CELERYD_USER" "$1"
maybe_die "Couldn't change owner of $1"
fi
if [ -n "$CELERYD_GROUP" ]; then
echo "- Changing group of '$1' to '$CELERYD_GROUP'"
chgrp "$CELERYD_GROUP" "$1"
maybe_die "Couldn't change group of $1"
fi
fi
}
check_paths() {
if [ $CELERY_CREATE_LOGDIR -eq 1 ]; then
create_default_dir "$CELERYD_LOG_DIR"
fi
if [ $CELERY_CREATE_RUNDIR -eq 1 ]; then
create_default_dir "$CELERYD_PID_DIR"
fi
}
create_paths() {
create_default_dir "$CELERYD_LOG_DIR"
create_default_dir "$CELERYD_PID_DIR"
}
export PATH="${PATH:+$PATH:}/usr/sbin:/sbin"
_get_pidfiles () {
# note: multi < 3.1.14 output to stderr, not stdout, hence the redirect.
${CELERYD_MULTI} expand "${CELERYD_PID_FILE}" ${CELERYD_NODES} 2>&1
}
_get_pids() {
found_pids=0
my_exitcode=0
for pidfile in $(_get_pidfiles); do
local pid=`cat "$pidfile"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pidfile)"
one_failed=true
my_exitcode=1
else
found_pids=1
echo "$pid"
fi
if [ $found_pids -eq 0 ]; then
echo "${SCRIPT_NAME}: All nodes down"
exit $my_exitcode
fi
done
}
_chuid () {
su "$CELERYD_USER" -c "$CELERYD_MULTI $*"
}
start_workers () {
if [ ! -z "$CELERYD_ULIMIT" ]; then
ulimit $CELERYD_ULIMIT
fi
_chuid $* start $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \
$CELERY_APP_ARG \
$CELERYD_OPTS
}
dryrun () {
(C_FAKEFORK=1 start_workers --verbose)
}
stop_workers () {
_chuid stopwait $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
}
restart_workers () {
_chuid restart $CELERYD_NODES $DAEMON_OPTS \
--pidfile="$CELERYD_PID_FILE" \
--logfile="$CELERYD_LOG_FILE" \
--loglevel="$CELERYD_LOG_LEVEL" \
$CELERY_APP_ARG \
$CELERYD_OPTS
}
kill_workers() {
_chuid kill $CELERYD_NODES --pidfile="$CELERYD_PID_FILE"
}
restart_workers_graceful () {
echo "WARNING: Use with caution in production"
echo "The workers will attempt to restart, but they may not be able to."
local worker_pids=
worker_pids=`_get_pids`
[ "$one_failed" ] && exit 1
for worker_pid in $worker_pids; do
local failed=
kill -HUP $worker_pid 2> /dev/null || failed=true
if [ "$failed" ]; then
echo "${SCRIPT_NAME} worker (pid $worker_pid) could not be restarted"
one_failed=true
else
echo "${SCRIPT_NAME} worker (pid $worker_pid) received SIGHUP"
fi
done
[ "$one_failed" ] && exit 1 || exit 0
}
check_status () {
my_exitcode=0
found_pids=0
local one_failed=
for pidfile in $(_get_pidfiles); do
if [ ! -r $pidfile ]; then
echo "${SCRIPT_NAME} down: no pidfiles found"
one_failed=true
break
fi
local node=`basename "$pidfile" .pid`
local pid=`cat "$pidfile"`
local cleaned_pid=`echo "$pid" | sed -e 's/[^0-9]//g'`
if [ -z "$pid" ] || [ "$cleaned_pid" != "$pid" ]; then
echo "bad pid file ($pidfile)"
one_failed=true
else
local failed=
kill -0 $pid 2> /dev/null || failed=true
if [ "$failed" ]; then
echo "${SCRIPT_NAME} (node $node) (pid $pid) is down, but pidfile exists!"
one_failed=true
else
echo "${SCRIPT_NAME} (node $node) (pid $pid) is up..."
fi
fi
done
[ "$one_failed" ] && exit 1 || exit 0
}
case "$1" in
start)
check_dev_null
check_paths
start_workers
;;
stop)
check_dev_null
check_paths
stop_workers
;;
reload|force-reload)
echo "Use restart"
;;
status)
check_status
;;
restart)
check_dev_null
check_paths
restart_workers
;;
graceful)
check_dev_null
restart_workers_graceful
;;
kill)
check_dev_null
kill_workers
;;
dryrun)
check_dev_null
dryrun
;;
try-restart)
check_dev_null
check_paths
restart_workers
;;
create-paths)
check_dev_null
create_paths
;;
check-paths)
check_dev_null
check_paths
;;
*)
echo "Usage: /etc/init.d/${SCRIPT_NAME} {start|stop|restart|graceful|kill|dryrun|create-paths}"
exit 64 # EX_USAGE
;;
esac
exit 0
它很旧,很长,似乎不包含任何我可以更改以影响使用的代理,除了默认值脚本 CELERY_DEFAULTS=/etc/default/celeryd
的位置(再次混淆名称).我承认我在没有完全理解的情况下很好地复制和粘贴了这个脚本,尽管我知道 init.d 脚本是如何工作的。
当我运行 /etc/init.d/celeryd start
工作人员启动,但忽略指向我的 redis 服务器的 BROKER django 设置,并尝试读取 RabbitMQ。日志文件/var/log/celery/w1.log
[2016-11-30 23:44:51,873: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.
所以 celery 正在尝试使用 RabbitMQ,而不是 Redis。还有其他帖子在 Stack overflow 上提示同样的问题,但没有一个得到解决(据我所知)。我把 djcelery 放在已安装的应用程序中,因为它似乎使 celeryd_multi
管理命令可用,但我不想使用 celery beat,文档说这不再是必需的。我设置了自己的队列来运行管理命令,过去我在设置 celerybeat 时遇到了太多问题。
我通过运行 sudo -u jimmy/srv/intro/bin/celery -A intro worker &
使它正常工作,这有效并使用了正确的 Redis 队列(有谁知道为什么它称为代理?),但不会在服务器电源循环时重新启动,不会写入日志文件,我只是觉得这不是运行 celery worker 的干净方法。
我真的不想使用/etc/init.d 脚本,因为这是做事的老方法,并且作为 upstart 运行来代替它,现在 systemd 是执行此操作的支持方式(如果我错了,请纠正我)。官方文档上没有提到这些方法 http://docs.celeryproject.org/en/v4.0.0/userguide/daemonizing.html#init-script-celeryd 这让我觉得不再支持 celery ,也许有更好的维护方式来做到这一点。奇怪的是它没有被内置到核心中。
我确实找到了这个 https://github.com/celery/celery/blob/3.1/extra/supervisord/supervisord.conf 但是配置文件中没有提到代理,我怀疑这对我使用 Redis 有帮助。
我如何让 Celery 作为守护进程运行以在重启时自动启动,并将 Redis 用作消息队列,或者是我使用 Celery 在 Django 中异步运行函数以使用 RabbitMQ 消息队列的唯一方法?
最佳答案
为确保 celery 加载正确的代理,将代理参数添加到 Celery 类。
app = Celery('intro', broker=settings.BROKER_URL)
引用: http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#application
关于redis - 将 Celery 作为守护进程运行不会使用 Redis 代理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41006223/