我正在使用 ConcurrentMessageListenerContainer
并将自动提交设置为 false 以使用来自主题的消息并写入数据库。如果数据库已关闭,我需要停止容器处理轮询中的当前记录并且不执行下一步 poll()
。我已经实现了 DataSourceHealthIndicator
,在检查数据库状态为 UP 后,我想再次重新启动我的容器以处理剩余的记录。
关于如何停止处理剩余记录并停止容器的任何建议,我已尝试使用 consumer.close()
。但它并没有停止这个过程,并且一直在抛出消费者已经关闭。
最佳答案
调用container.stop()
。如果您使用的是 @KafkaListener
,请在监听器容器注册表上调用 stop()
,这将停止所有已注册的容器。
编辑
automatically stopping the container 的错误处理程序从 2.1 版本开始可用;在撰写本文时,当前版本为 2.2.3。
关于spring-kafka - Spring Kafka Stop 容器出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47190007/