我正在使用 RabbitMQ 生产者将长时间运行的任务(30 分钟以上)发送给消费者。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。
通过研究我了解到 heartbeat或 increased connection timeout可以用来解决这个问题。这两种解决方案在尝试时都会引发错误。在阅读类似帖子的答案时,我还了解到自发布答案以来,RabbitMQ 已经实现了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。
指定心跳和阻塞连接超时时:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
显示以下错误:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
在连接参数中指定 heartbeat_interval=1000
时会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
同样对于 socket_timeout = 1000
会显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
我在 Ubuntu 14.04 上运行 RabbitMQ 3.6.1、pika 0.10.0 和 python 2.7。
- 为什么上述方法会产生错误?
- 可以在有长时间运行的连续任务的情况下使用心跳方法吗?例如,在执行需要 30 分钟以上的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。
我已经通读了类似问题的答案
更新:正在运行code from the pika documentation产生相同的错误。
最佳答案
我的系统遇到了与您看到的相同的问题,即在执行非常长的任务时连接中断。
如果您的网络设置强制断开空闲 TCP/IP 连接,心跳可能有助于保持连接有效。但是,如果不是这种情况,则更改心跳也无济于事。
更改连接超时根本无济于事。此设置仅在最初创建连接时使用。
I am using a RabbitMQ producer to send long running tasks (30 mins+) to a consumer. The problem is that the consumer is still working on a task when the connection to the server is closed and the unacknowledged task is requeued.
这有两个原因,您已经遇到了这两个原因:
- 连接随机断开,即使在最好的情况下也是如此
- 由于重新排队的消息而重新启动进程可能会导致问题
部署 RabbitMQ 代码的任务时间范围从不到一秒到几个小时不等,我发现立即确认消息并使用状态消息更新系统最适合非常长的任务,例如这样。
您需要有一个记录系统(可能带有数据库)来跟踪给定作业的状态。
当消费者选择消息并启动流程时,它应该立即确认消息并将“已启动”状态消息发送到记录系统。
当流程完成时,发送另一条消息表示它已完成。
这不会解决连接断开的问题,但无论如何都无法 100% 解决该问题。相反,它将防止在连接断开时发生消息重新排队问题。
不过,此解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,您如何恢复工作?
基本的答案是使用工作的记录系统(您的数据库)状态来告诉您需要重新开始该工作。当应用程序启动时,检查数据库以查看是否有未完成的工作。如果有,以任何适当的方式恢复或重新启动该工作。
关于python - RabbitMQ 在处理长时间运行的任务和超时设置产生错误时关闭连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36123006/