python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接

标签 python rabbitmq amqp pika python-pika

我正在使用 RabbitMQ 生产者将长时间运行的任务(30 分钟以上)发送给消费者。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。

通过研究我了解到 heartbeatincreased connection timeout可以用来解决这个问题。这两种解决方案在尝试时都会引发错误。在阅读类似帖子的答案时,我还了解到自发布答案以来,RabbitMQ 已经实现了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。

指定心跳和阻塞连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

在连接参数中指定 heartbeat_interval=1000 时会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

同样对于 socket_timeout = 1000 会显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我在 Ubuntu 14.04 上运行 RabbitMQ 3.6.1、pika 0.10.0 和 python 2.7。

  1. 为什么上述方法会产生错误?
  2. 可以在有长时间运行的连续任务的情况下使用心跳方法吗?例如,在执行需要 30 分钟以上的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。

我已经通读了类似问题的答案

更新:正在运行code from the pika documentation产生相同的错误。

最佳答案

我的系统遇到了与您看到的相同的问题,即在执行非常长的任务时连接中断。

如果您的网络设置强制断开空闲 TCP/IP 连接,心跳可能有助于保持连接有效。但是,如果不是这种情况,则更改心跳也无济于事。

更改连接超时根本无济于事。此设置仅在最初创建连接时使用。

I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.

这有两个原因,您已经遇到了这两个原因:

  1. 连接随机断开,即使在最好的情况下也是如此
  2. 由于重新排队的消息而重新启动进程可能会导致问题

部署 RabbitMQ 代码的任务时间范围从不到一秒到几个小时不等,我发现立即确认消息并使用状态消息更新系统最适合非常长的任务,例如这样。

您需要有一个记录系统(可能带有数据库)来跟踪给定作业的状态。

当消费者选择消息并启动流程时,它应该立即确认消息并将“已启动”状态消息发送到记录系统。

当流程完成时,发送另一条消息表示它已完成。

这不会解决连接断开的问题,但无论如何都无法 100% 解决该问题。相反,它将防止在连接断开时发生消息重新排队问题。

不过,此解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,您如何恢复工作?

基本的答案是使用工作的记录系统(您的数据库)状态来告诉您需要重新开始该工作。当应用程序启动时,检查数据库以查看是否有未完成的工作。如果有,以任何适当的方式恢复或重新启动该工作。

关于python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36123006/

相关文章:

php - 用于大量 MySql 插入和更新的 RabbitMQ?

RabbitMQ - 在发布者端查找消息已被消费者确认

centos - 安装 PHP amqp 扩展和 librabbitMQ 的问题

java - 编写无代理 AMQP 到 MQTT 适配器

python - 使用 OSMnx 的等时线

python - 根据百分位数绘制直方图

python - 如何在 tensorflow 中收集带有索引的元素

node.js - Node-amqp 和 socket.io 奇怪的行为

python - Python 中长度为 n 的一系列空列表?

RabbitMQ - 奇怪的disk_free