我在我的 flink 流作业中看到了一个奇怪的行为。这是我的代码
streamExecutionEnvironment.enableCheckpointing(checkPointInterval, CheckpointingMode.EXACTLY_ONCE);
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
ExecutionConfig executionConfig = streamExecutionEnvironment.getConfig();
executionConfig.disableForceKryo();
executionConfig.enableForceAvro();
Path path = new Path(outputPath);
CheckpointConfig config = streamExecutionEnvironment.getCheckpointConfig();
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
String mutateConfig = IOUtils.toString(EventProcessor.class.getClassLoader().getResourceAsStream(configFile));
FlinkKafkaConsumer flinkKafkaConsumer = new FlinkKafkaConsumer(topics,
new KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
properties);
flinkKafkaConsumer.setCommitOffsetsOnCheckpoints(true);
DataStream<GenericRecord> dataStream = streamExecutionEnvironment.addSource(flinkKafkaConsumer).name("booking_flow_source");
DataStream<GenericRecord> enrichDataStream = dataStream.map(new MapFunction<GenericRecord, GenericRecord>() {
private transient Mutator mutator;
@Override
public GenericRecord map(GenericRecord record) {
GenericRecord mutateRecord=record;
try {
mutator = new Mutator(mutateConfig);
mutateRecord = mutator.mutate(record);
} catch (Exception e) {
e.printStackTrace();
}
return mutateRecord;
}
});
enrichDataStream.print();
这段代码到目前为止运行良好。现在我需要从我的 avro 模式生成 java 类,所以我已经包含了这个 avro 依赖项。
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.9.1</version>
</dependency>
在我的 pom 中包含这个之后,我的代码停止工作并且我得到异常:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
即使我在我的代码中禁用了 kryo 并强制使用 avro,我仍然得到相同的异常。 如果我删除了此依赖项,则代码正在运行并且我的流正在打印。
所以我无法通过添加 avro 依赖项来理解发生了什么变化。
请帮忙
最佳答案
我遇到了类似的问题。我在 flinkconfiguration 中设置 classloader.resolve-order: parent-first 修复了它。
关于java - Flink 抛出 com.esotericsoftware.kryo.KryoException : java. lang.NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61985623/