我正在阅读此 link 中 Kafka 高级消费者的详细信息并看到以下声明 -
In practice, a more common pattern is to use sleep indefinitely and use a shutdown hook to trigger clean shutdown.
是否有执行此操作的示例或可以提供帮助的指示?
最佳答案
这将是一个无限循环的例子
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
//do something
}
} catch (WakeupException e) {
// do nothing we are shutting down
} finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
这将是您的关机钩子(Hook)。
@PostConstruct
private void init(){
addShutdownHook();
}
private void addShutdownHook(){
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
shutdown();
}
}));
}
关于java - 关闭 Kafka 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35566011/