apache-flink - 由于 Avro 数组类型,Flink 抛出 Kryo 错误

标签 apache-flink avro flink-streaming kryo

我的 Flink 反序列化器中的 getProducedType 方法出现以下错误:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
values (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:189)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:355)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:85)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:152)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:624)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
    at org.apache.avro.generic.GenericData$Array.add(GenericData.java:277)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:378)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:289)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)

反序列化器:

class AvroDeserializer[T <: GenericRecord : ClassTag](topic: String, schemaRegistryUrl: String) extends KeyedDeserializationSchema[T] {

  @transient lazy val keyDeserializer: KafkaAvroDeserializer = {
    val deserializer = new KafkaAvroDeserializer()
    deserializer.configure(
      Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
      true)
    deserializer
  }

  // Flink needs the serializer to be serializable => this "@transient lazy val" does the trick
  @transient lazy val valueDeserializer: KafkaAvroDeserializer = {
    val deserializer = new KafkaAvroDeserializer()
    deserializer.configure(
      // other schema-registry configuration parameters can be passed, see the configure() code
      // for details (among other things, schema cache size)
      Map(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl).asJava,
      false)
    deserializer
  }

  override def deserialize(messageKey: Array[Byte], message: Array[Byte],
                           topic: String, partition: Int, offset: Long): T = {
    valueDeserializer.deserialize(topic, message).asInstanceOf[T]
  }

  override def isEndOfStream(nextElement: T): Boolean = false

  override def getProducedType: TypeInformation[T] = {
   TypeExtractor.getForClass(implicitly[ClassTag[T]].runtimeClass.asInstanceOf[Class[T]])
  }

}

从我读到的内容来看,Kryo 在数组类型方面存在一些问题,而我的消息确实存在这些问题。如果这确实是真的,那么我如何将我的 Kafka 消息反序列化为 GenericRecord?

最佳答案

我自己之前也遇到过这个问题。这是因为 Kryo 解析器无法正确序列化 avro 类型。

要解决此问题,您可以将 flink-avro 库包含到您的项目中,详情如下:Avro support in Flink

完成此操作后,Flink 现在应该自动为 avro 类型使用特殊的解析器。

如果不是这种情况,您可以考虑尝试设置选项 enableForceAvro(),如 Execution Configuration 中所述。

关于apache-flink - 由于 Avro 数组类型,Flink 抛出 Kryo 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48067750/

相关文章:

java - 连接到远程任务管理器失败。这可能表明远程任务管理器已丢失

apache-flink - apache flink 运算符和键控状态如何与并行实例一起使用?

java - Flink 没有向 kafka 提交偏移量

go - 通过golang将消息以avro格式推送到kafka

apache-spark - NoSuchMethodError 使用 Databricks Spark-Avro 3.2.0

kubernetes - Kubernetes 中的 Apache Flink 部署 - 可用性和可扩展性

apache-flink - 未调用 SerializationSchema 打开方法

scala - 如何使用 Avro 文件上的架构在 Spark 中加载 Avros?

apache-flink - 无法将保存点从 1.2.1 恢复到 1.4

java.lang.NoSuchMethodError : scala. Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object