鉴于这个虚拟代码:
1 case class MyObject(values:mutable.LinkedHashMap[String, String])
...
2 implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
3 implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
4
5 val env = StreamExecutionEnvironment.getExecutionEnvironment
6
7 env
8 .fromElements("one")
9 .map(str =>
10 {
11 val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12 val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14 obj
15 })
16 .map(obj =>
17 {
18 val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20 obj
21 })
应用程序将在第 18 行崩溃,但异常:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap
问题似乎是通过序列化/反序列化
values
成员更改其对象类型,或者换句话说,LinkedHashMap
变成 HashMap
.请注意,与第 18 行相同的代码在第 12 行中完美运行。
在第 12 行设置断点时,
obj.values
将显示为 LinkedHashMap
通过调试器/IntelliJ,但是第 18 行中的断点将显示 obj.values
如 HashMap
在调试器中。这里发生了什么?我怎样才能解决这个问题?毕竟
LinkedHashMap
工具Serializable
?!
最佳答案
LinkedHashMap
的默认 Kryo Chill 序列化程序不保留 map 类型,而是将数据反序列化为 HashMap
.为了避免这种情况,需要为 LinkedHashMap
创建一个序列化程序。类型:
class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
kryo.writeObject(output, `object`.size)
for (elem <- `object`.iterator) {
kryo.writeClassAndObject(output, elem._1)
kryo.writeClassAndObject(output, elem._2)
}
}
override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
val result = new mutable.LinkedHashMap[K, V]()
val size = kryo.readObject(input, classOf[Int])
for (_ <- 1 to size) {
val key = kryo.readClassAndObject(input).asInstanceOf[K]
val value = kryo.readClassAndObject(input).asInstanceOf[V]
result.put(key, value)
}
result
}
}
然后将其注册为 Kryo
Serializer
:val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
关于scala - LinkedHashMap 更改为 HashMap 并在 flink 数据流算子中崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55614459/