java - 使用 GenericRecords 时,Flink Avro 序列化显示 "not serializable"错误

标签 java apache-kafka apache-flink avro confluent-schema-registry

我真的很难让 Flink 与正在运行的 Kafka 实例正确通信,使用来自 Confluence 架构注册表的 Avro 架构(对于两者键和值)。

经过一段时间的思考和重组我的程序,我能够插入我的实现到目前为止:

生产者方法

    public static FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>> kafkaAvroGenericProducer() {  
        final Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "--.-.-.--:9092");
        properties.put("schema.registry.url", "http://--.-.-.---:8081");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class should not matter
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KVSerializationSchema.class.getName()); //wrong class but should not matter


        return new FlinkKafkaProducer<Tuple2<GenericRecord,GenericRecord>>("flink_output",
                new GenericSerializer("flink_output", schemaK, schemaV, "http://--.-.-.---:8081"),
                properties, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);

    }
<小时/>

GenericSerializer.java

package com.reeeliance.flink;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;
import flinkfix.ConfluentRegistryAvroSerializationSchema;

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private String topic;   
    private Schema schemaKey;
    private Schema schemaValue;
    private String registryUrl;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        super();
        this.topic = topic;
        this.schemaKey = schemaK;
        this.schemaValue = schemaV;
        this.registryUrl = url;
    }

    public GenericSerializer() {
        super();
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl).serialize(element.f0);
        byte[] value = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl).serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}
<小时/>

但是,当我执行作业时,它在准备阶段失败,而作业实际上没有运行,并出现以下错误:

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: [H_EQUNR type:STRING pos:0] is not serializable. The object probably contains or references non serializable fields.
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:151)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:126)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:71)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:617)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:571)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.<init>(FlinkKafkaProducer.java:547)
    at com.reeeliance.flink.StreamingJob.kafkaAvroGenericProducer(StreamingJob.java:257)
    at com.reeeliance.flink.StreamingJob.main(StreamingJob.java:84)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field
    - custom writeObject data (class "java.util.ArrayList")
    - root object (class "org.apache.avro.Schema$LockableArrayList", [H_EQUNR type:STRING pos:0])
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at java.util.ArrayList.writeObject(ArrayList.java:766)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1140)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:133)
    ... 8 more
<小时/>

我知道所有类都必须实现可序列化接口(interface)或使其成为 transient ,但我不使用自己的类,并且错误不会解决不可序列化的函数(如通常的线程处理),而是记录或字段。 该字段来自键模式,这是一种仅包含这一字段的模式。我认为我的错误在于使用 GenericRecord,它没有实现 Serialized 接口(interface),但我看到 GenericRecord 经常用于这种序列化,所以它对我来说没有任何意义。

ConfluenceRegistryAvroSerializationSchema 类取自 GitHub ,因为它尚未包含在我们使用的当前 Flink 版本(1.9.1)中。我包括了必要的类(class)并更改了类(class),我认为这可能不是我的问题的原因。 (Issue solved)

有人可以帮我调试这个吗?如果您能向我展示一种不同的方法来实现相同的目标,我也将不胜感激,到目前为止,Flink Avro 和 Confluence Schema Registry 的不兼容性一直让我发疯。

最佳答案

异常消息告诉您哪个类不可序列化。

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$Field

问题出在您存储在 GenericSerializer 字段中的 Schema 类。

你可以试试这个:

public class GenericSerializer implements KafkaSerializationSchema<Tuple2<GenericRecord,GenericRecord>>{

    private final SerializationSchema<GenericRecord> valueDeserializer;
    private final SerializationSchema<GenericRecord> keyDeserializer;

    public GenericSerializer(String topic, Schema schemaK, Schema schemaV, String url) {
        this.keyDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-key", schemaKey, registryUrl);
        this.valueDeserializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic + "-value", schemaValue, registryUrl); 
    }

    @Override
    public ProducerRecord<byte[], byte[]> serialize(Tuple2<GenericRecord,GenericRecord> element, Long timestamp) {
        byte[] key = keySerializer.serialize(element.f0);
        byte[] value = valueSerializer.serialize(element.f1);

        return new ProducerRecord<byte[], byte[]>(topic, key, value);
    }

}

ConfluenceRegistryAvroSerializationSchema 是可序列化的,因此您可以安全地将其存储在 GenericSerializer 中的字段中。

它的性能也会更高,因为不会为每个传入记录重新实例化底层结构。

关于java - 使用 GenericRecords 时,Flink Avro 序列化显示 "not serializable"错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59982631/

相关文章:

java - 为 2 个不同版本构建 Ant

authentication - 从客户端使用 Kafka super 用户 ACL

java - 卡夫卡消费群挂起

apache-spark - "streaming"在Apache Spark和Apache Flink中是什么意思?

apache-flink - 在 Flink 中,stream windowing 好像不行?

java - 获取连接异常,连接超时: connect

java - Spring xml ...有条件加载变量

java - 通过 CSS 选择器抓取

apache-kafka - Kafka Streams 聚合阶段是否序列化和反序列化每个单个元素?

scala - 将 Kafka 消费者和生产者集成到一个函数中