apache-flink - 将作业 jar 添加到 $FLINK_HOME/lib 时获取以下类转换异常

标签 apache-flink flink-streaming

根据避免动态类加载的 flink 文档,可以执行以下操作。 当运行 Flink JobManager 和 TaskManager 专用于某个特定作业的设置时,可以将 JAR 文件直接放入/lib 文件夹,以确保它们是类路径的一部分,而不是加载动态类加载。

但是,当将 jars 添加到/lib 文件夹时,会收到以下异常。 对于此错误,是否有任何解决方法。

   org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) ~[iot-mirror-device.jar:na]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) ~[iot-mirror-device.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) ~[iot-mirror-device.jar:na]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ~[iot-mirror-device.jar:na]
        ... 4 common frames omitted
Timestamp=2018-03-26 13:46:42,433 LogLevel=INFO  ThreadId=[flink-akka.actor.default-dispatcher-6] Class=o.a.f.r.e.ExecutionGraph Msg=Source: Custom Source -> Sink: Unnamed (1/1) (3f12f6953a235eb43f07cdf7966b5fcf) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:235) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:95) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) ~[iot-mirror-device.jar:na]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) ~[iot-mirror-device.jar:na]
        at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_91]
Caused by: java.lang.ClassCastException: cannot assign instance of org.apache.commons.collections.map.LinkedMap to field org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.pendingOffsetsToCommit of type org.apache.commons.collections.map.LinkedMap in instance of org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) ~[na:1.8.0_91]
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2024) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353) ~[na:1.8.0_91]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373) ~[na:1.8.0_91]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) ~[iot-mirror-device.jar:na]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) ~[iot-mirror-device.jar:na]
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:220) ~[iot-mirror-device.jar:na]
        ... 4 common frames omitted

最佳答案

LinkedMap 类是从两个不同的包中加载的,并且它们正在相互分配。这是 "X cannot be cast to X exceptions" 类型的错误.

常见的原因是库与 Flink 的反向类加载方法不兼容。可以通过在 conf/flink-conf.yaml 中添加如下配置并重启 flink 来解决。

classloader.resolve-order: parent-first

这应该可以解决问题。

关于apache-flink - 将作业 jar 添加到 $FLINK_HOME/lib 时获取以下类转换异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49530333/

相关文章:

apache-flink - Flink Streaming : The different between TriggerResult. FIRE 和 TriggerResult.FIRE_AND_PURGE

apache-flink - 在 Flink 中使用计数器获取 numOfRecordsIn

java - 弗林克 : How to read from multiple kafka cluster using same StreamExecutionEnvironment

azure - 当我运行 flink 作业将数据存储到 Azure Data Lake 时,出现以下异常。有人可以指导我吗?

apache-spark - 在实践中,迷你批处理与实时流之间有什么区别(不是理论上的区别)?

java - 基于增量模型的异常值检测

apache-flink - Flink Savepoint 数据文件夹丢失

apache-kafka - 为什么在实时处理中使用apache kafka

csv - Flink 将 SingleOutputStreamOperator 写入两个文件而不是一个

apache-flink - 为什么我们在 flink 源代码中有 flink-streaming-java 和 flink-streaming-scala 模块