我设置了一个单节点 kafka 并尝试了一个简单的发布/订阅模式:
在我的笔记本电脑上,我通过代码生成了一些消息:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.23.152:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>("tp3", Integer.toString(i), "hello " + Integer.toString(i)));
producer.close();
我还写了一个简单的消费者:
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.23.152:9092");
props.put("group.id", "g1");
props.put("client.id","client1");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "latest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("tp3"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
TimeUnit.SECONDS.sleep(1000);
}
但消费者没有取回任何东西
有人向我解释一下发生了什么事吗? 我确信生产者工作得很好,因为我使用控制台命令来检索消息并且它工作得很好(我在这里附上经过验证的图像)
任何帮助表示赞赏:( :( :(
最佳答案
根据卡夫卡FAQ :
Why does my consumer never get any data?
By default, when a consumer is started for the very first time, it ignores all existing data in a topic and will only consume new data coming in after the consumer is started. If this is the case, try sending some more data after the consumer is started. Alternatively, you can configure the consumer by setting auto.offset.reset to "earliest" for the new consumer in 0.9 and "smallest" for the old consumer.
关于apache-kafka - 高级 Kafka 消费者 API 不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37832125/