python - celery worker 与经纪人断开连接

标签 python networking rabbitmq celery amqp

我正在使用 PythonRabbitMQCelery 将任务分配给工作人员。每个任务大约需要 15 分钟,并且 99% 受 CPU 限制。我的系统有 24 个内核,每当我的工作人员执行此任务时,我都会收到与代理的连接错误。

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
[...]
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

我发现了其他几个关于这个问题的帖子,但都没有修复它。尤其是在 CPU 负载很重的情况下,知道我该如何解决吗?

Windows 10 (worker)

macOS 10.14 (RabbitMQ Server)

Python 3.7

Celery 4.3.0 (rhubarb)

RabbitMQ 3.7.16 (Erlang 22.0.7)

我的配置让 worker 一次只消耗 1 个任务,即使 worker 进程在每个作业后重新启动,仍然没有运气:

CELERYD_MAX_TASKS_PER_CHILD = 1,
CELERYD_CONCURRENCY = 1,
CELERY_TASK_RESULT_EXPIRES=3600,
CELERYD_PREFETCH_MULTIPLIER = 1,
CELERY_MAX_CACHED_RESULTS = 1,
CELERY_ACKS_LATE = True,

这是整个调用栈:

[2019-10-12 08:49:57,695: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 318, in start
    blueprint.start(self)
File "C:\Python37\lib\site-packages\celery\bootsteps.py", line 119, in start
    step.start(parent)
File "C:\Python37\lib\site-packages\celery\worker\consumer\consumer.py", line 596, in start
    c.loop(*c.loop_args())
File "C:\Python37\lib\site-packages\celery\worker\loops.py", line 118, in synloop
    qos.update()
File "C:\Python37\lib\site-packages\kombu\common.py", line 442, in update
    return self.set(self.value)
File "C:\Python37\lib\site-packages\kombu\common.py", line 435, in set
    self.callback(prefetch_count=new_value)
File "C:\Python37\lib\site-packages\celery\worker\consumer\tasks.py", line 47, in set_prefetch_count
    apply_global=qos_global,
File "C:\Python37\lib\site-packages\kombu\messaging.py", line 558, in qos
    apply_global)
File "C:\Python37\lib\site-packages\amqp\channel.py", line 1853, in basic_qos
    wait=spec.Basic.QosOk,
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 68, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
File "C:\Python37\lib\site-packages\amqp\abstract_channel.py", line 88, in wait
    self.connection.drain_events(timeout=timeout)
File "C:\Python37\lib\site-packages\amqp\connection.py", line 504, in drain_events
    while not self.blocking_read(timeout):
File "C:\Python37\lib\site-packages\amqp\connection.py", line 509, in blocking_read
    frame = self.transport.read_frame()
File "C:\Python37\lib\site-packages\amqp\transport.py", line 252, in read_frame
    frame_header = read(7, True)
File "C:\Python37\lib\site-packages\amqp\transport.py", line 438, in _read
    s = recv(n - len(rbuf))
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

最佳答案

我找到了解决这个问题的方法。我觉得问题出在 celery 后端。就我而言,我使用的是 Redis。

以下是我的配置

Broker - rabbitmq
Backend - redis
Python - 3.7
OS - Windows 10

在 celery 客户端,我尝试每 60 秒从客户端 ping 一次 worker 的 celery 状态。在这种情况下,我没有遇到连接重置问题。

while not doors_res.ready():
    sleep(60)
result = app.get()

其中 app 是 celery 实例。

在 celery worker 这边

celery worker -A <celery_file_name> -l info -P gevent

我的任务运行了大约 2 个小时,我没有遇到连接重置错误。

关于python - celery worker 与经纪人断开连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58356264/

相关文章:

python - 在模板标签中使用绝对网址,名称显示但不会定向到链接

java - Java 中 URL 类中的 getHost 和 getAuthority 方法有什么区别?

java - spring-amqp读取一条rabbit消息最早的入口点是什么?

java - 如何配置监听器来处理来自rabbitmq spring中同一队列的多个交换器的消息

python - 大量数据点的曲线拟合

将Json文件实时存储到amazon S3的Python脚本

Python:如何使用 Ubuntu Mate 创建超过 2 RPi 的简单聊天

c - 了解 c 中的多线程服务器/客户端程序

java - 如何在 spring-rabbitmq + spring cloud 中手动断开 Channel 与监听器的连接

Python 递归尝试未返回任何值