如果我们使用 spring amqp 客户端,它会自动禁用 amqp 客户端自动恢复属性,并使用自己的自动恢复机制。因此,如果我重新启动负载均衡器默认后面的集群节点之一,spring amqp 自动恢复对集群不起作用,因为 amqp 连接和 channel 已连接的节点重新启动,spring amqp 不会恢复并再次重新连接不同的节点。但是,如果已经连接并且消耗队列保留节点不会影响重新启动节点。
例如,我们的 RabbitMQ 集群由 AWS 上 ELB 后面的三个节点组成,内部使用 aws_peer_discovery 插件。我们的集群策略是/HA Policy .* all {"ha-mode":"all","ha-sync-mode":"automatic","queue-master-locator":"random","queue-mode":"lazy"} 0
我们的消费者客户端在 ECS 上运行,有 4 个任务已连接并使用我们的持久和镜像队列。这意味着只有一个持久和镜像队列。使用 4 个消费者任务进行消费。
使用默认 Spring AMQP CachingConnectionFactory 的消费者如下代码块
@Bean
public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
return new CachingConnectionFactory(connectionFactory);
}
@Bean
public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
return new CachingConnectionFactory(connectionFactory);
}
在这种情况下,当重新启动节点(使用命令行 systemctl restartrabbitmq)时,连接到此重新启动的节点的消费者无法正确自动恢复,我看到尝试连接到集群并且已经连接但没有消耗,所以当我执行rabbitmqctl list_connection时显示所有连接正常,但rabbitmqctl list_consumers向我显示之前已连接到重新启动器节点并再次重新启动节点的缺席消费者,但消费者没有重新连接另一个节点或同一节点并且没有消耗当前队列。
如果我将 spring ampq 客户端连接更改为以下行,这意味着 spring ampq 不,我们正在使用默认的 amqp-client 自动恢复机制,不使用 spring 本身,并且此时再次测试相同的场景,我们不会收到相同的错误,因此当我重新启动节点测试时,我们的消费者重新连接另一个节点并继续消费。
@Bean
public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
CachingConnectionFactory cachingConnectionFactory =
new CachingConnectionFactory(connectionFactory);
cachingConnectionFactory.getRabbitConnectionFactory().setAutomaticRecoveryEnabled(true);
return cachingConnectionFactory;
为什么 spring-amqp 无法正常工作。我想知道我们是否在负载均衡器后面使用集群,我们必须使用默认的 ampq-client 自动恢复机制,或者我们的 spring-amqp 客户端代码块错误或不存在??
最佳答案
您可以配置可在监听器容器配置中应用的Recovery BackOff
。我建议使用指数退避。对于发布者交互,它是rabbitTemplate的退避策略配置。
关于java - Spring AMQP客户端自动重新连接但不消耗恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57291992/