java - 卡夫卡 0.8.2 消费者

标签 java apache-kafka kafka-consumer-api

我正在用 Java 实现一个简单的 Kafka 消费者。这是代码:

public class TestConsumer {

    public static void main(String []a) throws Exception{
        Properties props = new Properties();
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("partition.assignment.strategy", "round-robin");
        props.put("group.id", "test");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        try{
            consumer.subscribe("ay_sparktopic");
            Map<String, ConsumerRecords<String, String>> msg = consumer.poll(100);
            System.out.println(msg);                
        }catch(Exception e){
            System.out.println("Exception");
        }
    }
}

上述消费者给出以下错误消息:

16/03/30 18:01:07 WARN ConsumerConfig: The configuration group.id = test was supplied but isn't a known config. 16/03/30 18:01:07 WARN ConsumerConfig: The configuration partition.assignment.strategy = round-robin was supplied but isn't a known config.

我在线查看的任何文档都给出了范围或循环作为可能的分配策略,而 groupId 是据我所知的自定义名称。不确定这里的正确配置值是什么。

最佳答案

您似乎正在尝试使用仅在 Kafka 0.9+ 中可用的新消费者 API。要使用旧版 API,您必须从 kafka.javaapi.consumer.* 包而不是新的 org.apache.kafka.clients.consumer 包导入类。

consumer.subscribeconsumer.poll 与新 API 相关,因此如果您确实想使用旧 API,则需要相应更改代码。如果您想使用新的消费者 API,则需要运行 Kafka 0.9 或更高版本。

关于java - 卡夫卡 0.8.2 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36322768/

相关文章:

apache-kafka - 无法使用 kafka-avro-console-consumer 读取 avro 消息。序列化异常 : Unknown magic byte

java - 如何获取 kafka 主题分区的最后/结束偏移量?

c# - 如何从 Confluent kafka C# 库中的 Kafka 主题获取最新偏移量?

java - 适用于 Android 的 Fabric crashlytics

java - JComponent 未在 JFrame 中显示

java - 在 Glassfish 4 上运行 Spring 3 应用程序

apache-kafka - Kafka多分区排序

apache-kafka - KTable 状态存储无限保留

java - 无法使用kafka Producer API与kafka服务器通信

java - java util 类中带有 BASE64 的 SHA1 无法生成正确的密码