我正在用 Java 实现一个简单的 Kafka 消费者。这是代码:
public class TestConsumer {
public static void main(String []a) throws Exception{
Properties props = new Properties();
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("partition.assignment.strategy", "round-robin");
props.put("group.id", "test");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try{
consumer.subscribe("ay_sparktopic");
Map<String, ConsumerRecords<String, String>> msg = consumer.poll(100);
System.out.println(msg);
}catch(Exception e){
System.out.println("Exception");
}
}
}
上述消费者给出以下错误消息:
16/03/30 18:01:07 WARN ConsumerConfig: The configuration group.id = test was supplied but isn't a known config. 16/03/30 18:01:07 WARN ConsumerConfig: The configuration partition.assignment.strategy = round-robin was supplied but isn't a known config.
我在线查看的任何文档都给出了范围或循环作为可能的分配策略,而 groupId 是据我所知的自定义名称。不确定这里的正确配置值是什么。
最佳答案
您似乎正在尝试使用仅在 Kafka 0.9+ 中可用的新消费者 API。要使用旧版 API,您必须从 kafka.javaapi.consumer.*
包而不是新的 org.apache.kafka.clients.consumer
包导入类。
consumer.subscribe
和 consumer.poll
与新 API 相关,因此如果您确实想使用旧 API,则需要相应更改代码。如果您想使用新的消费者 API,则需要运行 Kafka 0.9 或更高版本。
关于java - 卡夫卡 0.8.2 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36322768/