apache-kafka - Spring Kafka 无法正常关闭

标签 apache-kafka spring-kafka

我已将 ConcurrentMessageListenerContainer 配置为并发数为 3,以便从 3 个分区进行消费,还配置了 KafkaTemplate 以及 ProducerFactory,将消息生成到 3 个分区。 Spring bean 配置有 destroy-method,以在应用程序关闭时调用消费者监听器容器和生产者的 stop()。关闭后,日志如下所示,看起来消费者已停止,但没有生产者是否停止的信息。

[kafkaContainer-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
[kafkaContainer-2-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
[kafkaContainer-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped

但应用程序并未完全关闭。执行 jstack 命令显示 3 个 ThreadPoolTask​​Scheduler 仍在后台运行。 jstack命令的输出片段:

"ThreadPoolTaskScheduler-1" #74 prio=5 os_prio=0 tid=0x00007f8564f23800 nid=0x77e5 waiting on condition [0x00007f8525292000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ec8f0808> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)

上面的 block 是为每个MessageListener打印的,我的意思是如果我将并发设置为5,那么jstack输出包含上面的消息5次。因此,我认为即使消费者监听器被记录为关闭,它在内部也不会完全关闭。

我是否遗漏了正确关闭生产者和消费者的内容?

最佳答案

您没有说明您正在使用哪个版本。

此问题已在 2.1.0、2.0.2 和 1.3.2 中修复。

您不需要通过 destroy() 方法stop() 容器;上下文将在关闭时停止消费者。

关于apache-kafka - Spring Kafka 无法正常关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47853783/

相关文章:

java - 使用 KafkaTransactionManager 在事务 KafkaTemplate 中进行基于事件的提交

java - Spring Kafka Error Handler如何避免无限循环

java - 在 Kafka 中重播消息

docker-compose + 如何设置 ramdisk 而不是卷

apache-kafka - Kafka 保留期之谜

ubuntu - 无法将主管与 Apache Kafka 一起使用

apache-kafka - 使用 react 器 Kafka 从主题读取消息并将消息批量写入 REST 端点

spring-boot - Spring Boot Kafka 监听器并发

apache-kafka - Kafka 经纪人不断 ISR 缩小和扩大?

java - 使用额外的第三方插件运行 kafka 服务器,用于统计数据收集、日志记录等