clojure - 如何在 Langohr 中容忍 RabbitMQ 重启?

标签 clojure rabbitmq langohr

我们有从 Rabbit 队列中读取的 Clojure 代码。我们愿意容忍 RabbitMQ 服务器短暂停机的情况,例如在重新启动的情况下 ( sudo service rabbitmq-server restart )。

似乎有 some provision for reconnecting在朗格。我们修改了示例 clojurewerkz.langohr.examples.recovery.example1 ( Gist here )。与已发布示例的细微差别包括连接参数,以及 lb/publish 的删除调用(因为我们使用外部源填充数据)。

我们可以成功消费队列中的数据并等待更多消息。但是,当我们重新启动 RMQ 时(在托管 RabbitMQ 的虚拟机上通过上述 sudo 命令),会抛出以下异常:

Caught an exception during connection recovery!
java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:378)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:545)
    at com.novemberain.langohr.Connection.recoverConnection(Connection.java:166)
    at com.novemberain.langohr.Connection.beginAutomaticRecovery(Connection.java:115)
    at com.novemberain.langohr.Connection.access$000(Connection.java:18)
    at com.novemberain.langohr.Connection$1.shutdownCompleted(Connection.java:93)
    at com.rabbitmq.client.impl.ShutdownNotifierComponent.notifyListeners(ShutdownNotifierComponent.java:75)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:573)
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.io.EOFException
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:321)
    ... 8 more
Caused by: java.io.EOFException
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:273)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:533)

Langohr 提供的预期重启机制在启动时似乎很可能会中断。在这些“硬”重启的情况下,是否有首选的替代模式?或者,我想我们必须实现连接监控并自己重试。任何建议将是最受欢迎的。

最佳答案

我们曾经看到过这样的堆栈跟踪,但在 Langohr 2.9.0 中我们不再看到它们。重启后,我们的 clojure 客户端重新连接,消息再次开始流动。

我们使用默认设置,打开连接和拓扑覆盖,如下所示:

(infof "Automatic recovery enabled? %s" (rmq/automatic-recovery-enabled? connection))
(infof "Topology recovery enabled? %s" (rmq/automatic-topology-recovery-enabled? connection))

关于clojure - 如何在 Langohr 中容忍 RabbitMQ 重启?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23278332/

相关文章:

java - RabbitMQ 没有选择正确的消费者

clojure - ClassCastException java.lang.Long 不能转换为 clojure.lang.IFn

clojure - 在调用转换时,集合值会传递多少个归约函数?

function - Clojure - 测试函数表达式的相等性?

clojure - Clojure 源中的父 eval(阅读器)函数?

python - Python Kombu-阻止

data-structures - RabbitmQ 使用什么算法进行主题交换的模式匹配