java - 永远运行kafka消费者(新消费者API)

标签 java apache-kafka kafka-consumer-api

我在 Apache Kafka 上构建了一个排队系统。应用程序将向特定的 Kafka 主题生成消息,并且在消费者端我必须使用为该主题生成的所有记录。
我使用新的 Java Consumer Api 编写了 Consumer。 代码看起来像

  Properties props = new Properties();  
                     props.put("bootstrap.servers", kafkaBrokerIp+":9092");  
                     props.put("group.id",groupId);  
                     props.put("enable.auto.commit", "true");
                     props.put("session.timeout.ms", "30000");
                     props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer(props);
                     consumer.subscribe(Arrays.asList("consumertest"));  
                     while (true) {  
                         ConsumerRecords<String, String> records = consumer.poll(100);  
                         for (ConsumerRecord<String, String> record : records){  
                             System.out.println("Data recieved : "+record.value());  
                             }  
                     }

在这里,我需要永远运行消费者,以便生产者推送到 kafka 主题的任何记录都应该立即消费和处理。
所以我的困惑是,使用无限 while 循环(如示例代码中)来使用数据是否是正确的方法?

最佳答案

虽然可以无限循环,但可以在 Kafka 消费者 documentation 中找到一种稍微优雅的方法。如下:

public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
           consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

这使您可以选择使用 Hook 正常关闭。

关于java - 永远运行kafka消费者(新消费者API),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38095711/

相关文章:

apache-kafka - 如何知道作为主动 Controller 的经纪人?

apache-kafka - 计算存储在 kafka 主题中的消息数

apache-kafka - kafka-python 消费者给出错误

apache-kafka - 卡夫卡休息的例子

java - 打印循环列表

Java 注释 "Inheritance"

java - 在 Apache Kafka 中创建主题时出错

apache-kafka - 了解 Kafka 主题和分区

Java 算法 : comparing each *thing* to every other

java - 仍然无法创建该表。问题是什么?