我已将 ConcurrentMessageListenerContainer
配置为并发数为 3,以便从 3 个分区进行消费,还配置了 KafkaTemplate
以及 ProducerFactory,将消息生成到 3 个分区。 Spring bean 配置有 destroy-method,以在应用程序关闭时调用消费者监听器容器和生产者的 stop()
。关闭后,日志如下所示,看起来消费者已停止,但没有生产者是否停止的信息。
[kafkaContainer-0-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
[kafkaContainer-2-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
[kafkaContainer-1-C-1] INFO org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Consumer stopped
但应用程序并未完全关闭。执行 jstack 命令显示 3 个 ThreadPoolTaskScheduler 仍在后台运行。 jstack命令的输出片段:
"ThreadPoolTaskScheduler-1" #74 prio=5 os_prio=0 tid=0x00007f8564f23800 nid=0x77e5 waiting on condition [0x00007f8525292000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x00000000ec8f0808> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1081) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
上面的 block 是为每个MessageListener打印的,我的意思是如果我将并发设置为5,那么jstack输出包含上面的消息5次。因此,我认为即使消费者监听器被记录为关闭,它在内部也不会完全关闭。
我是否遗漏了正确关闭生产者和消费者的内容?
最佳答案
您没有说明您正在使用哪个版本。
此问题已在 2.1.0、2.0.2 和 1.3.2 中修复。
您不需要通过 destroy()
方法stop()
容器;上下文将在关闭时停止消费者。
关于apache-kafka - Spring Kafka 无法正常关闭,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47853783/