我有一个简单的 Java Kafka 消费者。如果 Kafka 代理不可用,我试图捕获异常。我需要它来中断线程。
我有这样的代码:
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties());
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
// records handling
} catch(Exception e) {
System.out.println(e.getMessage());
}finally {
kafkaConsumer.close();
}
如果 Kafka 服务器关闭,我不会捕获任何异常,但日志中会显示以下消息:
18/03/28 13:33:39 WARN clients.NetworkClient: [Consumer clientId=consumer-3, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
18/03/28 13:33:40 WARN clients.NetworkClient: [Consumer clientId=consumer-1, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
有办法在我的线程中处理它吗?
最佳答案
这样的东西可以工作:
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup(); 1
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
你做consumer.wakeup()
中断当前消费者的操作。
mainThread.join()
放在那里是为了确保主线程实际完成并且在唤醒后的处理过程中不会被关闭。请记住,shutdownHook 还负责处理中断,而不仅仅是普通的程序关闭。
关于java - 如何检测java中的消费者是否无法使用kafka代理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49532645/