我正在尝试让一个简单的 Kafka Consumer 使用 Java API v0.9.0.1 工作。我使用的kafka服务器是一个docker容器,也运行0.9.0.1版本。下面是消费者代码:
public class Consumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("messages"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("Message received: " + record.value());
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
consumer.close();
System.out.println("After closing KafkaConsumer");
}
}
}
但是,当启动消费者时,它会调用上面的 poll(100) 方法并且永远不会返回。调试,看起来它永远卡在 org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient 中运行以下方法:
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
this.poll(9223372036854775807L);
} while(this.metadata.version() == version);
}
(版本和 this.metadata.version() 似乎总是 == 2)。此外,虽然它没有抛出任何错误,但来 self 的 java 生产者的消息从未出现在队列中。我已经验证,使用命令行 kafka 工具,我可以从队列发送和接收消息。
有人知道这里发生了什么吗?
最佳答案
如果这可以帮助其他遇到类似问题的人,我的解决方案是设置以下环境变量:
ADVERTISED_HOST=localhost
ADVERTISED_PORT=9092
(当然,此处的值可能会更改以适合您的安装)
显然,命令行消费者和生产者脚本可以在不设置这些环境变量的情况下设法找到代理并与代理正确通信,但 Java API 实现却不能。也不会引发任何错误,只是在尝试更新元数据时在第一次轮询时出现无限循环。
关于java - Kafka 0.9.0.1 Java Consumer陷入awaitMetadataUpdate(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37770024/