apache-kafka - 高级 Kafka 消费者 API 不起作用

标签 apache-kafka kafka-consumer-api kafka-producer-api

我设置了一个单节点 kafka 并尝试了一个简单的发布/订阅模式:

在我的笔记本电脑上,我通过代码生成了一些消息:

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.23.152:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("batch.size", 16384);
    props.put("linger.ms", 1);
    props.put("buffer.memory", 33554432);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>("tp3", Integer.toString(i), "hello " + Integer.toString(i)));

    producer.close();

我还写了一个简单的消费者:

    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.23.152:9092");
    props.put("group.id", "g1");
    props.put("client.id","client1");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("auto.offset.reset", "latest");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("tp3"));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records)
            System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        TimeUnit.SECONDS.sleep(1000);
    }

但消费者没有取回任何东西

有人向我解释一下发生了什么事吗? 我确信生产者工作得很好,因为我使用控制台命令来检索消息并且它工作得很好(我在这里附上经过验证的图像) enter image description here

任何帮助表示赞赏:( :( :(

最佳答案

根据卡夫卡FAQ :

Why does my consumer never get any data?

By default, when a consumer is started for the very first time, it ignores all existing data in a topic and will only consume new data coming in after the consumer is started. If this is the case, try sending some more data after the consumer is started. Alternatively, you can configure the consumer by setting auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" for the old consumer.

关于apache-kafka - 高级 Kafka 消费者 API 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37832125/

相关文章:

apache-kafka - 如何删除一个特定主题的组的消费者偏移量

java - 如何使用Kafka传输文件

java - 默认的kafkaListenerContainerFactory是如何工作的

unit-testing - Kafka Streams 测试 : java. util.NoSuchElementException:未初始化的主题: "output_topic_name"

javascript - 需要在我的 React-Native 应用程序中使用 Apache Kafka

scala - 您如何通过 Play Framework 中的 Websockets 将数据从 Kafka 流发送到客户端?

apache-kafka - Kafka 抛出 java.nio.channels.ClosedChannelException

java - 不同acks下的生产者吞吐量=0,1,-1

apache-kafka - Kafka生产者读取数据文件

scala - 如何在 Scala 中编写 Kafka Producer