java - Kafka Consumer 无法为 JsonDeserializer 消费 905 字节的消息

标签 java spring apache-kafka kafka-consumer-api spring-kafka

我们正在将 Spring 与 Kafka 消费者和生产者一起使用。我们正在生成大小为 905 字节的消息。我们正在序列化消息并尝试将其反序列化以供下一个消费者使用。

消息有效负载类示例:

{  
        "FILE_LIST":[  
            {
                "KEY1": "Large String Value",
                "KEY2": "Large String Value",
                "Key3": "Large String Value",
                "Key4": "Large String Value"
            }
        ]
}

有效负载类别

import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@NoArgsConstructor
@AllArgsConstructor
public class InnerModel {

    @JsonProperty("KEY1")
    @Getter
    @Setter
    private String key1;

    @JsonProperty("KEY2")
    @Getter
    @Setter
    private String key2;

    @JsonProperty("KEY3")
    @Getter
    @Setter
    private String key3;

    @JsonProperty("KEY4")
    @Getter
    @Setter
    private String key4;
}

CustomModel.java

 package com.consumer.model; 


public class CustomModel {
    public CustomModel(List<InnerModel> filesList) {
        super();
        this.filesList = filesList;
    }

    @JsonProperty("FILE_LIST")
    @NonNull
    @Getter
    @Setter
    List<InnerModel> filesList;
}

消费者代码

import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@KafkaListener(topics = "customtopic", groupId = "customgroup")
public class CustomModelConsumer {
    @KafkaHandler(isDefault = true)
    private void consumeCustomModel(CustomModel model) {
        System.out.println("Model Consumer");
        System.out.println(model);
    }
}

application.properties

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.compression-type=gzip 
spring.kafka.consumer.properties.spring.json.trusted.packages=com.consumer.model

当我们接受字符串格式的消息有效负载时,消费者工作得很好,但是当我们将消费者中的有效负载反序列化为对象时,我们会遇到问题。同样会抛出以下错误

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition customtopic-0 at offset 70. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[123, 34, 73, 78, 80, 85, 84, 95, 70, 73, 76, 69, 95, 76, 73, 83, 84, 34, 58, 91, 123, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 67, 85, 82, 82, 69, 78, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 46, 109, 112, 52, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 71, 76, 79, 66, 65, 76, 95, 84, 73, 77, 69, 83, 84, 65, 77, 80, 95, 67, 85, 82, 82, 69, 78, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 47, 48, 58, 48, 48, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 51, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 46, 106, 115, 111, 110, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 78, 69, 88, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 46, 109, 112, 52, 34, 44, 34, 79, 82, 73, 71, 73, 78, 65, 76, 95, 67, 76, 73, 80, 95, 71, 76, 79, 66, 65, 76, 95, 84, 73, 77, 69, 83, 84, 65, 77, 80, 95, 78, 69, 88, 84, 34, 58, 34, 115, 51, 58, 47, 47, 98, 45, 97, 111, 45, 112, 114, 111, 100, 117, 99, 116, 45, 109, 111, 99, 107, 47, 55, 52, 49, 48, 57, 102, 98, 98, 45, 54, 102, 52, 101, 45, 52, 48, 54, 50, 45, 97, 98, 48, 102, 45, 100, 49, 102, 53, 98, 98, 102, 55, 49, 97, 56, 49, 47, 99, 108, 105, 112, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 49, 48, 48, 48, 95, 111, 114, 105, 103, 105, 110, 97, 108, 99, 108, 105, 112, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 47, 48, 58, 48, 51, 58, 48, 48, 46, 48, 48, 48, 95, 48, 58, 48, 54, 58, 48, 48, 46, 50, 48, 48, 95, 116, 115, 46, 106, 115, 111, 110, 34, 125, 93, 125]] from topic [customtopic] Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of com.consumer.model.CustomModel (no Creators, like default construct, exist): cannot deserialize from Object value (no delegate- or property-based Creator) at [Source: (byte[])"{"FILE_LIST":[{"KEY1":"Large String Value","KEY2":"Large String Value","Key3":"Large String Value"[truncated 405 bytes]; line: 1, column: 2] at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.DeserializationContext.reportBadDefinition(DeserializationContext.java:1451) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.DeserializationContext.handleMissingInstantiator(DeserializationContext.java:1027) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1290) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1611) ~[jackson-databind-2.9.5.jar:2.9.5] at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1234) ~[jackson-databind-2.9.5.jar:2.9.5] at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:198) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103) ~[kafka-clients-1.0.0.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]

最佳答案

正如@Naveen Kumar所述,我缺少默认或全参数构造函数。更改代码如下所示,使其工作

package com.consumer.model; 

@NoArgsConstructor
@AllArgsConstructor
public class CustomModel {   
  @JsonProperty("FILE_LIST")
  @NonNull
  @Getter
  @Setter
  List<InnerModel> filesList;
}

关于java - Kafka Consumer 无法为 JsonDeserializer 消费 905 字节的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49721019/

相关文章:

java - 如何在 firebase 数据库中获取嵌套键

java - 将字符串拆分为 unicode 单词? (特别是越南语)

java - 多人游戏(Java 套接字写入/读取对象)

java - jpa/hibernate 如何通过带注释的外键映射元素集合

java - 在 vaadin 和 spring 下进行日志记录

apache-kafka - 错误 : package org. apache.kafka.clients.producer 没有

apache-kafka - 在没有 Confluent Schema Registry 的情况下在 KafkaConnect 中使用 Avro

java - KeyEvents 新的 KeyEvent 变量?

apache-kafka - 如何进入Kafka集成的Zookeeper? (2.2.0)

java - 找不到引用的 bean 'jpaMappingContext'