java - Apache Kafka 和 Avro : org. apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customer

标签 java apache-kafka avro kafka-consumer-api kafka-producer-api

每当我尝试从 kafka 队列中读取消息时,都会出现以下异常:

[error] (run-main-0) java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.harmeetsingh13.java.Customer
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.infiniteConsumer(AvroSpecificDeserializer.java:79)
        at com.harmeetsingh13.java.consumers.avrodesrializer.AvroSpecificDeserializer.main(AvroSpecificDeserializer.java:87)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)

卡夫卡生产者代码:

public class AvroSpecificProducer {
    private static Properties kafkaProps = new Properties();
    private static KafkaProducer<String, Customer> kafkaProducer;

    static {
        kafkaProps.put("bootstrap.servers", "localhost:9092");
        kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
        kafkaProducer = new KafkaProducer<>(kafkaProps);
    }

    public static void fireAndForget(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record);
    }

    public static void asyncSend(ProducerRecord<String, Customer> record) {
        kafkaProducer.send(record, (recordMetaData, ex) -> {
            System.out.println("Offset: "+ recordMetaData.offset());
            System.out.println("Topic: "+ recordMetaData.topic());
            System.out.println("Partition: "+ recordMetaData.partition());
            System.out.println("Timestamp: "+ recordMetaData.timestamp());
        });
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        Customer customer1 = new Customer(1002, "Jimmy");
        ProducerRecord<String, Customer> record1 = new ProducerRecord<>("CustomerSpecificCountry",
                "Customer One 11 ", customer1
        );

        asyncSend(record1);

        Thread.sleep(1000);
    }
}

卡夫卡消费者代码:

public class AvroSpecificDeserializer {

    private static Properties kafkaProps = new Properties();

    static {
        kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1");
        kafkaProps.put("zookeeper.connect", "localhost:2181");
        kafkaProps.put("schema.registry.url", "http://localhost:8081");
    }

    public static void infiniteConsumer() throws IOException {
        VerifiableProperties properties = new VerifiableProperties(kafkaProps);
        KafkaAvroDecoder keyDecoder = new KafkaAvroDecoder(properties);
        KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties);

        Map<String, Integer> topicCountMap = new HashMap<>();
        topicCountMap.put("NewTopic", 1);

        ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps));
        Map<String, List<KafkaStream<Object, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);

        KafkaStream stream = consumerMap.get("NewTopic").get(0);
        ConsumerIterator it = stream.iterator();

        System.out.println("???????????????????????????????????????????????? ");
        while (it.hasNext()) {
            System.out.println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ");
            MessageAndMetadata messageAndMetadata = it.next();
            String key = (String) messageAndMetadata.key();
            GenericRecord record = (GenericRecord) messageAndMetadata.message();
            Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record);
            System.out.println("Key: " + key);
            System.out.println("Value: " + customer);
        }

    }

    public static void main(String[] args) throws IOException {
        infiniteConsumer();
    }
}

我正在关注这些示例:

  1. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-producer/src/main/java/io/confluent/examples/producer/AvroClicksProducer.java
  2. https://github.com/confluentinc/examples/blob/3.1.x/kafka-clients/specific-avro-consumer/src/main/java/io/confluent/examples/consumer/AvroClicksSessionizer.java

最佳答案

在与@harmeen 讨论后,这是最终可行的代码

static { 
    kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest"); 
    kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "CustomerCountryGroup1"); 
    kafkaProps.put("zookeeper.connect", "localhost:2181"); 
    kafkaProps.put("schema.registry.url", "http://localhost:8081"); 
    kafkaProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true); 
}

public static void infiniteConsumer() throws IOException { 

VerifiableProperties properties = new VerifiableProperties(kafkaProps); 
StringDecoder keyDecoder = new StringDecoder(properties); 
KafkaAvroDecoder valueDecoder = new KafkaAvroDecoder(properties); 

Map<String, Integer> topicCountMap = new HashMap<>(); 
topicCountMap.put("BrandNewTopics", 1); 

ConsumerConnector consumer = createJavaConsumerConnector(new kafka.consumer.ConsumerConfig(kafkaProps)); 
Map<String, List<KafkaStream<String, Object>>> consumerMap = consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder); 

KafkaStream stream = consumerMap.get("BrandNewTopics").get(0); 
ConsumerIterator it = stream.iterator(); 

while (it.hasNext()) { 
    MessageAndMetadata messageAndMetadata = it.next(); 
    String key = (String) messageAndMetadata.key(); 
    GenericRecord record = (GenericRecord) messageAndMetadata.message(); 
    Customer customer = (Customer) SpecificData.get().deepCopy(Customer.SCHEMA$, record); 
    System.out.println("Key: " + key); 
    System.out.println("Value: " + customer); 
} 

发生变化的事情:

  • SPECIFIC_AVRO_READER_CONFIG 属性添加为 true。
  • 使用最小的从主题的开头开始。
  • 对键使用 StringSerializerStringDeserializer
  • 改变生产者和消费者以反射(reflect)之前的变化
  • 调整代表 Avro 记录的 Customer 类的命名空间。

关于java - Apache Kafka 和 Avro : org. apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42200875/

相关文章:

java - 通过引用调用的行为不符合预期

java - Apache Commons UrlValidator

Python:模拟 Kafka 进行集成测试

java - 使用 Java 的 Kafka 流 : Send TO multiple topics

hadoop - 如何将具有空格的列名传递给sqoop --map-column-java

java - Avro 与 Protobuf 性能对比

java - flatMap Set of Sets inside groupingBy 的优雅方式

java - 使用 for 循环将英语翻译为莫尔斯电码

apache-kafka - apache-kafka 拥有 1 亿个主题

scala - Spark : java. io.NotSerializableException : org. apache.avro.Schema$RecordSchema