apache-kafka - Apache Kafka : . ..StringDeserializer 不是 ...Deserializer 的实例

标签 apache-kafka kafka-consumer-api

在我的简单应用程序中,我试图实例化一个 KafkaConsumer 我的代码几乎是 code from javadoc 的副本(“自动偏移提交”):

@Slf4j
public class MyKafkaConsumer {

    public MyKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe( Arrays.asList("mytopic"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                log.info( record.offset() + record.key() + record.value() );
                //System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }
}

如果我尝试实例化它,我会得到:

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at ...MyKafkaConsumer.<init>(SikomKafkaConsumer.java:23)
    ...
    Caused by: org.apache.kafka.common.KafkaException: org.apache.kafka.common.serialization.StringDeserializer is not an instance of org.apache.kafka.common.serialization.Deserializer
        at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:680)
        ... 48 more

如何解决这个问题?

最佳答案

不确定这是否最终解决了您的错误,但请注意,当使用 spring-kafka-test(版本 2.1.x,从版本 2.1.5 开始)和 1.1.x kafka-clients jar 时,您将需要如下覆盖某些传递依赖项:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>${spring.kafka.version}</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <version>${spring.kafka.version}</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <scope>test</scope>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
    <classifier>test</classifier>
    <scope>test</scope>
</dependency>

所以这肯定是你的传递依赖的问题

关于apache-kafka - Apache Kafka : . ..StringDeserializer 不是 ...Deserializer 的实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48864032/

相关文章:

java - 柑橘框架 : not fail test on receive timeout

java - 我如何使用 Kafka Consumer 的动态主题

java - 使用 apache camel 和 Kafka 时如何处理背压?

docker - 如何将高级Kafka配置添加到Landoop `fast-data-dev`镜像

hadoop - Storm : storm-hdfs hdfs blolt failing after 24hrs

java - Kafka 性能测试不可用

elasticsearch - 卡夫卡融合云ElasticsearchSink Connector:索引中的映射冲突

docker - 基于控制台的 dockerized kafka 消费者

apache-kafka - 如何在kafka中创建一个新的消费者组

java - 获取队列中的所有kafka消息并停止在java中流式传输