java - Flink 抛出 com.esotericsoftware.kryo.KryoException : java. lang.NullPointerException

标签 java apache-kafka apache-flink avro flink-streaming

我在我的 flink 流作业中看到了一个奇怪的行为。这是我的代码

        streamExecutionEnvironment.enableCheckpointing(checkPointInterval, CheckpointingMode.EXACTLY_ONCE);
        streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        ExecutionConfig executionConfig = streamExecutionEnvironment.getConfig();
        executionConfig.disableForceKryo();
        executionConfig.enableForceAvro();
        Path path = new Path(outputPath);
        CheckpointConfig config = streamExecutionEnvironment.getCheckpointConfig();
        config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        String mutateConfig = IOUtils.toString(EventProcessor.class.getClassLoader().getResourceAsStream(configFile));

        FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
                new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
        DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("booking_flow_source");


        DataStream<GenericRecord> enrichDataStream = dataStream.map(new MapFunction<GenericRecord, GenericRecord>() {
            private transient Mutator mutator;
            @Override
            public GenericRecord map(GenericRecord record)  {
                GenericRecord mutateRecord=record;
                try {
                    mutator = new Mutator(mutateConfig);
                    mutateRecord = mutator.mutate(record);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return mutateRecord;
            }
        });

        enrichDataStream.print();

这段代码到目前为止运行良好。现在我需要从我的 avro 模式生成 java 类,所以我已经包含了这个 avro 依赖项。

<dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.9.1</version>
</dependency>

在我的 pom 中包含这个之后,我的代码停止工作并且我得到异常:

org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)

即使我在我的代码中禁用了 kryo 并强制使用 avro,我仍然得到相同的异常。 如果我删除了此依赖项,则代码正在运行并且我的流正在打印。

所以我无法通过添加 avro 依赖项来理解发生了什么变化。

请帮忙

最佳答案

我遇到了类似的问题。我在 flinkconfiguration 中设置 classloader.resolve-order: parent-first 修复了它。

关于java - Flink 抛出 com.esotericsoftware.kryo.KryoException : java. lang.NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61985623/

相关文章:

apache-flink - 为什么Flink中的env.readTextFile(...).first(10).print会读取所有数据?

java - 弗林克 : Window evaluation

java - 二维数组中静态一维数组的问题

java - 如何手动记录 JAX-RS 参数的 Swagger 数据模型?

java - JSch ChannelSftp.ls 抛出空指针

java - Kafka Mirror Maker 执行地点

Java 对 String 数组进行排序并返回不同的 Item

java - 有没有办法从 Spark 流作业中读取 Kafka 流中的特定偏移量?

apache-kafka - 增加 KafkaConsumer 单次轮询中读取的记录数

apache-kafka - flink kafka 生产者和消费者中恰好一次