java - 如何检测java中的消费者是否无法使用kafka代理?

标签 java apache-kafka kafka-consumer-api

我有一个简单的 Java Kafka 消费者。如果 Kafka 代理不可用,我试图捕获异常。我需要它来中断线程。

我有这样的代码:

KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties());
kafkaConsumer.subscribe(Arrays.asList(topic));
try {
    ConsumerRecords<String, String> records = kafkaConsumer.poll(500);
    // records handling
} catch(Exception e) {
    System.out.println(e.getMessage());
}finally {
    kafkaConsumer.close();
}

如果 Kafka 服务器关闭,我不会捕获任何异常,但日志中会显示以下消息:

18/03/28 13:33:39 WARN clients.NetworkClient: [Consumer clientId=consumer-3, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.
18/03/28 13:33:40 WARN clients.NetworkClient: [Consumer clientId=consumer-1, groupId=JAVA] Connection to node -1 could not be established. Broker may not be available.

有办法在我的线程中处理它吗?

最佳答案

这样的东西可以工作:

Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                consumer.wakeup(); 1
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

你做consumer.wakeup()中断当前消费者的操作。

mainThread.join()放在那里是为了确保主线程实际完成并且在唤醒后的处理过程中不会被关闭。请记住,shutdownHook 还负责处理中断,而不仅仅是普通的程序关闭。

关于java - 如何检测java中的消费者是否无法使用kafka代理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49532645/

相关文章:

rest - Kafka Rest API KSQL 查询永远等待并挂起

java - 如何从ConsumerGroup中的所有分区获取最后一条日志

JButton 上的 Java Actionlistener 没有名称?

java - 使用 Jmeter 从 csv 文件创建包含多个项目(在一个 SOAP/XML 消息中)的 xml 列表

java - 在 servlet 的 init-param 类中使用 spring Autowiring

java - 逗号作为Java中的小数点分隔符

testing - 如何在 Quarkus 测试中禁用 Kafka?

python - 在 python 中保持 Kafka 消费者存活的最佳实践是什么?

apache-kafka - 如何使用 Kafka Console Consumer 在两个时间戳之间使用消息

apache-kafka - Kafka 一次性生产者消费者