java - 无法使用 kafka 和 spring boot 反序列化数据

标签 java spring-boot apache-kafka spring-kafka

我正在尝试将 Spring Boot 应用程序与远程 kafka 集成,但在启动应用程序时遇到错误。消费者监听远程kafka时出现错误,请引用以下:

2020-04-02 08:28:58.795  INFO 17760 --- [  restartedMain] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2020-04-02 08:28:58.855  INFO 17760 --- [  restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8081 (http) with context path ''
2020-04-02 08:28:58.858  INFO 17760 --- [  restartedMain] kafka.topic.queue.Application            : Started Application in 16.273 seconds (JVM running for 18.555)
2020-04-02 08:28:59.874  INFO 17760 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-1, groupId=json] Cluster ID: l6gelyg5RtKbGqghgTYnAA
2020-04-02 08:28:59.876  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=json] Discovered group coordinator xx.xxx.5.xxx:9092 (id: 2147483646 rack: null)
2020-04-02 08:28:59.884  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=json] Revoking previously assigned partitions []
2020-04-02 08:28:59.886  INFO 17760 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : json: partitions revoked: []
2020-04-02 08:28:59.887  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=json] (Re-)joining group
2020-04-02 08:29:00.805  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=json] (Re-)joining group
2020-04-02 08:29:01.617  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=json] Successfully joined group with generation 37
2020-04-02 08:29:01.635  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=json] Setting newly assigned partitions: devtopic-2, devtopic-1, devtopic-0
2020-04-02 08:29:02.026  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-2 to the committed offset FetchPosition{offset=5, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.028  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-1 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.028  INFO 17760 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=json] Setting offset for partition devtopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=xx.xxx.5.xxx:9092 (id: 1 rack: null), epoch=0}}
2020-04-02 08:29:02.640  INFO 17760 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : json: partitions assigned: [devtopic-2, devtopic-1, devtopic-0]
**2020-04-02 08:29:03.891  INFO 17760 --- [ntainer#0-0-C-1] k.topic.queue.consumer.MessageConsumer   : Logger 1 [JSON] received key null: Type [N/A] | Payload: Name::toString() -> {id=1,fname=test,mname=test,lname=test} | Record: ConsumerRecord(topic = devtopic, partition = 1, leaderEpoch = 0, offset = 7, CreateTime = 1585747622488, serialized key size = -1, serialized value size = 53, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = Name::toString() -> {id=1,fname=test,mname=test,lname=test})**
2020-04-02 08:29:04.300 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]

2020-04-02 08:29:04.305 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]

2020-04-02 08:29:04.308 ERROR 17760 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: null

org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition devtopic-1 at offset 8. If needed, please seek past the record to continue consumption.
Caused by: org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[108]] from topic [devtopic]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'l': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"l"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:722) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3556) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2651) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:856) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:753) ~[jackson-core-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._initForReading(ObjectReader.java:357) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1704) ~[jackson-databind-2.10.2.jar:2.10.2]
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1282) ~[jackson-databind-2.10.2.jar:2.10.2]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:438) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:1268) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$3600(Fetcher.java:124) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1492) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) ~[kafka-clients-2.3.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) ~[kafka-clients-2.3.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1012) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:968) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [na:1.8.0_241]
at java.util.concurrent.FutureTask.run(Unknown Source) [na:1.8.0_241]
at java.lang.Thread.run(Unknown Source) [na:1.8.0_241]

错误一直持续到我停止运行应用程序为止。此外,您可能会注意到,在错误发生之前,我能够使用来自该主题的消息。

我将在下面提供我的配置:

应用程序.yml
server:
  port: 8081

spring:
  kafka:
    bootstrap-servers: xx.xxx.5.xxx:9092
    consumer:
      #bootstrap-servers: xx.xxx.5.xxx:9092
      group-id: json
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring:
          json:
            trusted:
              packages: kafka.topic.queue.entity
            use:
              type:
                headers: false
            value:
              default:
                type: kafka.topic.queue.entity.Name
    producer:
      #bootstrap-servers: xx.xxx.5.xxx:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        spring:
          json:
            add:
              type:
                headers: false

消息消费者.java
@Service
public class MessageConsumer{

        @KafkaListener(topics = TopicQueueConstant.TOPIC_NAME, groupId = "json",
        containerFactory = "kafkaListenerContainerFactory")
   public void listenAsObject(ConsumerRecord<String, Name> cr,
                           @Payload Name payload) {
    logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
            typeIdHeader(cr.headers()), payload, cr);
   }

   private static String typeIdHeader(Headers headers) {
        return StreamSupport.stream(headers.spliterator(), false)
                .filter(header -> header.key().equals("__TypeId__"))
                .findFirst().map(header -> new String(header.value())).orElse("N/A");
   }
}

消息生产者.java
@Service
public class MessageProducer{

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void sendMessageToTopic(String message) {
       logger.info("sendMessageToTopic service invoked");
       logger.info("sending to topic queue");
       Name name = new Name();
       name.setFname("test");
       name.setLname("test");
       name.setMname("test");
       name.setId(1L);
       //       Message<Name> msg = MessageBuilder
       //               .withPayload(name)
       //               .setHeader(KafkaHeaders.TOPIC, TopicQueueConstant.TOPIC_NAME)
       //               .build();
       this.kafkaTemplate.send(TopicQueueConstant.TOPIC_NAME, "1", name);
       logger.info("message has been sent successfully to topic queue");
   }
}

Name.java (POJO)
public class Name implements Serializable {

/**
 * 
 */
private static final long serialVersionUID = 5172012720819652286L;

private long id;

private String fname;

private String lname;

private String mname;

public Name() {}

public Name(@JsonProperty("id") final long id, 
        @JsonProperty("fname") final String fname, 
        @JsonProperty("lname") final String lname, 
        @JsonProperty("mname") final String mname) {
    super();
    this.id = id;
    this.fname = fname;
    this.lname = lname;
    this.mname = mname;
}

public long getId() {
    return id;
}

public void setId(long id) {
    this.id = id;
}

public String getFname() {
    return fname;
}

public void setFname(String fname) {
    this.fname = fname;
}

public String getLname() {
    return lname;
}

public void setLname(String lname) {
    this.lname = lname;
}

public String getMname() {
    return mname;
}

public void setMname(String mname) {
    this.mname = mname;
}

@Override
public String toString() {
    return "Name::toString() -> {"
            + "id=" + this.id
            + ",fname=" + this.fname
            + ",mname=" + this.mname
            + ",lname=" + this.lname + "}";
}

}

这是我的 Controller 中的方法
@Autowired
private MessageProducer producer;

@PostMapping(value = "/publish")
@ResponseBody
public ResponseEntity<Object> publishMessage(@RequestParam String message){
    logger.info("publishMessage endpoint invoked");
    logger.info("parameter received = {}", message);
    this.producer.sendMessageToTopic(message);
    return ResponseEntity.ok().build();
}

我现在被这个问题困扰了将近 2 天。

最佳答案

Tru配置JsonDeserializer在 ConsumerFactory 上使用适当的默认类型进行反序列化,而不是在 application.yml 中配置它

配置步骤 JsonDeserializer在代码中:Link

关于java - 无法使用 kafka 和 spring boot 反序列化数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60982754/

相关文章:

java - 如何使用 jSTL 在原始值旁边打印随机值?

java - 解决 Thymeleaf 缺乏可选支持的问题

java - 在Camel-Kafka中创建并发Kafka消费者

apache-kafka - 将新消费者添加到正在运行的消费者组时会发生什么?

java - Spring @Transactional 只读

java - Spring Boot、MySQL、Tomcat 无法在 Eclipse 中创建池的初始连接

java - 如何将网格中单元格的邻居存储到优先级队列中

java - Spring Boot JPA 基本配置优先于应用程序配置

java - Spring Hibernate 模式更新不适用于数据库名称中的减号

apache-kafka - 防止kafka消费者长时间超时