java - 重启 RabbitMQ 主节点时 SimpleMessageListenerContainer 停止从 RabbitMQ 消费

标签 java rabbitmq spring-amqp spring-rabbit

当 RabbitMQ 配置为集群工作并且主节点重新启动时,org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer 无法恢复并且无法使用消息(来自 RabbitMQ)。

spring-rabbit版本:1.3.3.RELEASE

RabbitMQ版本:3.3.1

当集群的主节点重新启动时,我们会得到以下异常:


2014-10-03 17:36:00,136 [WARN] SimpleMessageListenerContainer(1099): Consumer raised exception, processing can restart if the connection factory supports it 
com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method(reply-code=320, reply-text=CONNECTION_FORCED - broker forced connection closure with reason 'shutdown', class-id=0, method-id=0)
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:715) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:705) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.handleConnectionClose(AMQConnection.java:660) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection.processControlCommand(AMQConnection.java:615) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$1.processAsync(AMQConnection.java:107) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:540) ~[amqp-client-3.3.1.jar:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:00,136 [INFO] SimpleMessageListenerContainer(1090): Restarting Consumer: tags=[[amq.ctag-5pEx0wJyFWB1DNqUlN_51w]], channel=Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5673/,1), acknowledgeMode=AUTO local queue size=0 
2014-10-03 17:36:06,459 [WARN] SimpleMessageListenerContainer(1099): Consumer raised exception, processing can restart if the connection factory supports it 
org.springframework.amqp.AmqpIOException: java.io.IOException
    at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.RabbitAccessor.convertRabbitAccessException(RabbitAccessor.java:112) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:841) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:820) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.initialize(RabbitAdmin.java:391) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin$11.onCreate(RabbitAdmin.java:323) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CompositeConnectionListener.onCreate(CompositeConnectionListener.java:32) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createConnection(CachingConnectionFactory.java:361) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:309) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:283) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:276) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$600(CachingConnectionFactory.java:69) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:614) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:363) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: java.io.IOException: null
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:779) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.queueDeclare(ChannelN.java:61) ~[amqp-client-3.3.1.jar:na]
    at org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl.queueDeclare(PublisherCallbackChannelImpl.java:362) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_67]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_67]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_67]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_67]
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:536) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at com.sun.proxy.$Proxy8.queueDeclare(Unknown Source) ~[na:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.declareQueues(RabbitAdmin.java:458) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin.access$200(RabbitAdmin.java:54) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitAdmin$12.doInRabbit(RabbitAdmin.java:395) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:835) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    ... 18 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - home node 'hare@mbp15Q0F1G3' of durable queue 'myqueue' in vhost '/' is down or inaccessible, class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118) ~[amqp-client-3.3.1.jar:na]
    ... 31 common frames omitted
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - home node 'hare@mbp15Q0F1G3' of durable queue 'myqueue' in vhost '/' is down or inaccessible, class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[amqp-client-3.3.1.jar:na]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[amqp-client-3.3.1.jar:na]
    ... 1 common frames omitted
2014-10-03 17:36:06,460 [INFO] SimpleMessageListenerContainer(1090): Restarting Consumer: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0 
2014-10-03 17:36:06,465 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:06,466 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=2 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:11,469 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:11,470 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=1 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:16,475 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:16,475 [WARN] BlockingQueueConsumer(388): Queue declaration failed; retries left=0 
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) [spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
2014-10-03 17:36:21,479 [WARN] BlockingQueueConsumer(446): Failed to declare queue:myqueue 
2014-10-03 17:36:21,480 [ERROR] SimpleMessageListenerContainer(1026): Consumer received fatal exception on startup 
org.springframework.amqp.rabbit.listener.FatalListenerStartupException: Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it.
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:407) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:963) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_67]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_67]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
Caused by: org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[myqueue]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:449) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:382) ~[spring-rabbit-1.3.3.RELEASE.jar:na]
    ... 4 common frames omitted
2014-10-03 17:36:21,480 [ERROR] SimpleMessageListenerContainer(1085): Stopping container from aborted consumer 

最佳答案

channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - home node 'hare@mbp15Q0F1G3' of durable queue 'myqueue' in vhost '/' is down or inaccessible, class-id=50, method-id=10)

听起来您还没有为 HA 配置队列 - 仅 mirrored queues被复制。

关于java - 重启 RabbitMQ 主节点时 SimpleMessageListenerContainer 停止从 RabbitMQ 消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26186533/

相关文章:

java - 带有长任务进度条的模态窗口

java - 性能: ConcurrentLinkedQueue<Integer> as Serial incoming buffer

java - 值未分配给字节

rabbitmq - RabbitListener 将队列绑定(bind)到多个交换器

java - 通过RabbitMQ维护correlationId

java - 如果我不能调用 onCreate() 中的方法,我应该在哪里调用它?安卓/Java

java - Eclipse IDE 2019-06 在打破嵌套循环后不显示建议

azure - 在 Azure 上读取 RabbitMQ 消息

java - 跨不同节点的多个查询会对性能产生影响吗?

java - 从 Java/Spring 检索 RabbitMQ 队列中未确认消息的数量