scala - LinkedHashMap 更改为 HashMap 并在 flink 数据流算子中崩溃

标签 scala exception serialization hashmap apache-flink

鉴于这个虚拟代码:

 1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

应用程序将在第 18 行崩溃,但异常:
Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

问题似乎是通过序列化/反序列化 values成员更改其对象类型,或者换句话说,LinkedHashMap变成 HashMap .

请注意,与第 18 行相同的代码在第 12 行中完美运行。

在第 12 行设置断点时,obj.values将显示为 LinkedHashMap通过调试器/IntelliJ,但是第 18 行中的断点将显示 obj.valuesHashMap在调试器中。

这里发生了什么?我怎样才能解决这个问题?毕竟LinkedHashMap工具Serializable ?!

最佳答案

LinkedHashMap 的默认 Kryo Chill 序列化程序不保留 map 类型,而是将数据反序列化为 HashMap .为了避免这种情况,需要为 LinkedHashMap 创建一个序列化程序。类型:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}

然后将其注册为 Kryo Serializer :
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())

关于scala - LinkedHashMap 更改为 HashMap 并在 flink 数据流算子中崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55614459/

相关文章:

java - 如何将 Scala 列表转换为 Java ArrayList

scala - 将工作分配给多个核心 : Hadoop or Scala's parallel collections?

c# - 具有属性的集合的 Xml 序列化

java - 为什么序列号版本的UID使用随机数?

斯卡拉: "class needs to be abstract, since method is not defined"错误

scala - 用于理解 if 守卫

java - 小程序:关闭时触发异常

c# - 当 Application.Run() 发生异常时,您有什么选择?

java - Slf4j with Log4j 在wrapper exception有消息时不打印wrapped exception (caused by)

c++ - 使用指向对象的指针 vector 进行 boost (C++ - linux)