java - 关闭 Kafka 消费者

标签 java spring apache-kafka

我正在阅读此 link 中 Kafka 高级消费者的详细信息并看到以下声明 -

In practice, a more common pattern is to use sleep indefinitely and use a shutdown hook to trigger clean shutdown.

是否有执行此操作的示例或可以提供帮助的指示?

最佳答案

这将是一个无限循环的例子

public void run() {
    try {
      consumer.subscribe(topics);
      while (true) {
          ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        //do something
      }
    } catch (WakeupException e) {
      // do nothing we are shutting down 
    } finally {
      consumer.close();
    }
  }

  public void shutdown() {
    consumer.wakeup();
  }
}

这将是您的关机钩子(Hook)。

@PostConstruct
    private void init(){
        addShutdownHook(); 
    }

 private void addShutdownHook(){
   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

            @Override
            public void run() {
                shutdown();
            }
        }));
    }

关于java - 关闭 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35566011/

相关文章:

java - SparkJava Web浏览器下载html文件而不是在部署war文件时显示

java - 无法使用 Google App Engine (GAE) 运行 Spring Data Rest

ssl - 在 Apache Kafka 中禁用 TLS 1.0、TLS 1.1

java - "SELECT... WHERE column > ?1"- 那是什么?

java - 创建名称为 'mvcValidator' 的 bean 时出错

java - kafka-streams - TopologyBuilder/KStreamBuilder 对象是否可重用?

apache-kafka - 生成消息时是否必须指定所有 Kafka broker IP

java - 静态方法参数的同步

java - BigInteger.intValue()>1 给出错误的 boolean 值

java - 如何从我的 Android 应用程序的 firebase 中的键中获取值?