2020-05-14 19:32:11.238 INFO 19880 --- [on(4)-127.0.0.1] o.s.c.support.DefaultLifecycleProcessor:无法在 30000 超时内关闭阶段值为 2147483547 的 1 个 bean:[org .springframework.kafka.config.internalKafkaListenerEndpointRegistry]
最佳答案
默认情况下,Spring 要求每个 SmartLifecycle
阶段的 Bean 在 30 秒内停止;您可以通过添加以下 bean 来更改该行为:
@Bean
public DefaultLifecycleProcessor lifecycleProcessor() {
DefaultLifecycleProcessor lp = new DefaultLifecycleProcessor();
lp.setTimeoutPerShutdownPhase(120_000);
return lp;
}
编辑
在容器的监听器正在处理批处理时停止容器不会影响批处理的处理;监听器线程不会被容器杀死;但是,默认情况下,容器将在 10 秒后发布容器停止事件(容器属性 shutDownTimeout
),即使监听器实际上尚未完成。
如果您担心生命周期处理器会在批处理中终止线程并且不想增加其超时,则可以通过结合暂停使用者和监听事件来执行正常关闭。
这是一个例子:
@SpringBootApplication
public class So61799727Application {
public static void main(String[] args) {
SpringApplication.run(So61799727Application.class, args);
}
@KafkaListener(id = "so61799727", topics = "so61799727", concurrency = "3")
public void listen(List<String> in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so61799727").partitions(3).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template,
KafkaListenerEndpointRegistry registry) {
return args -> {
sendTen(template);
System.out.println("Hit enter to pause container");
System.in.read();
registry.getListenerContainer("so61799727").pause();
};
}
public static void sendTen(KafkaTemplate<String, String> template) {
IntStream.range(0, 10).forEach(i -> template.send("so61799727", "foo" + i));
}
}
@Component
class Eventer {
privaate final KafkaTemplate<String, String> template;
private final AtomicInteger paused = new AtomicInteger();
Eventer(KafkaTemplate<String, String> template) {
this.template = template;
}
@EventListener
public void paused(ConsumerPausedEvent event) {
System.out.println(event);
if (this.paused.incrementAndGet() ==
event.getContainer(ConcurrentMessageListenerContainer.class).getConcurrency()) {
System.out.println("All containers paused");
So61799727Application.sendTen(this.template);
}
}
@EventListener
public void idle(ListenerContainerIdleEvent event) {
System.out.println(event);
}
}
spring.kafka.listener.idle-event-interval=5000
spring.kafka.listener.type=batch
spring.kafka.producer.properties.linger.ms=50
关于java - @KafkaListener 正常关闭,批处理 Kakfa 监听器不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61799727/