java - @KafkaListener 正常关闭,批处理 Kakfa 监听器不工作

标签 java apache-kafka spring-kafka

2020-05-14 19:32:11.238 INFO 19880 --- [on(4)-127.0.0.1] o.s.c.support.DefaultLifecycleProcessor:无法在 30000 超时内关闭阶段值为 2147483547 的 1 个 bean:[org .springframework.kafka.config.internalKafkaListenerEndpointRegistry]

最佳答案

默认情况下,Spring 要求每个 SmartLifecycle 阶段的 Bean 在 30 秒内停止;您可以通过添加以下 bean 来更改该行为:

@Bean
public DefaultLifecycleProcessor lifecycleProcessor() {
    DefaultLifecycleProcessor lp = new DefaultLifecycleProcessor();
    lp.setTimeoutPerShutdownPhase(120_000);
    return lp;
}

编辑

在容器的监听器正在处理批处理时停止容器不会影响批处理的处理;监听器线程不会被容器杀死;但是,默认情况下,容器将在 10 秒后发布容器停止事件(容器属性 shutDownTimeout),即使监听器实际上尚未完成。

如果您担心生命周期处理器会在批处理中终止线程并且不想增加其超时,则可以通过结合暂停使用者和监听事件来执行正常关闭。

这是一个例子:

@SpringBootApplication
public class So61799727Application {

    public static void main(String[] args) {
        SpringApplication.run(So61799727Application.class, args);
    }


    @KafkaListener(id = "so61799727", topics = "so61799727", concurrency = "3")
    public void listen(List<String> in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so61799727").partitions(3).replicas(1).build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template,
            KafkaListenerEndpointRegistry registry) {

        return args -> {
            sendTen(template);
            System.out.println("Hit enter to pause container");
            System.in.read();
            registry.getListenerContainer("so61799727").pause();
        };
    }

    public static void sendTen(KafkaTemplate<String, String> template) {
        IntStream.range(0, 10).forEach(i -> template.send("so61799727", "foo" + i));
    }

}
@Component
class Eventer {

    privaate final KafkaTemplate<String, String> template;

    private final AtomicInteger paused = new AtomicInteger();

    Eventer(KafkaTemplate<String, String> template) {
        this.template = template;
    }

    @EventListener
    public void paused(ConsumerPausedEvent event) {
        System.out.println(event);
        if (this.paused.incrementAndGet() ==
                event.getContainer(ConcurrentMessageListenerContainer.class).getConcurrency()) {
            System.out.println("All containers paused");
            So61799727Application.sendTen(this.template);
        }
    }

    @EventListener
    public void idle(ListenerContainerIdleEvent event) {
        System.out.println(event);
    }

}
spring.kafka.listener.idle-event-interval=5000
spring.kafka.listener.type=batch
spring.kafka.producer.properties.linger.ms=50

关于java - @KafkaListener 正常关闭,批处理 Kakfa 监听器不工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61799727/

相关文章:

java - 单链表转循环链表

docker - 卡夫卡生产者抛出 'TimeoutException: Batch Expired'异常

java - KafkaProducer 未生成> 1 MB 的消息到主题

spring-kafka - @RetryableTopic 在与 topicPartitions 一起使用来重置偏移量时显示奇怪的行为 - spring kafka

java - 在 spring boot 中启用 ssl 的最佳实践是什么?

java - 环境变量和@Value 不能在 Spring Boot 上协同工作

java - KeyListener - 我需要在主程序中调用 keyPressed 方法吗?

Avro 与 Protobuf 的性能指标

json - Kafka Stream 从 JSON 到 Avro

spring-boot - 停止使用 Stream 监听器的消息