scala - apache flink 的 union 类型混淆?

标签 scala apache-flink

我尝试合并多个 flink 数据集。它们包含在 Seq 中。以下是产生问题的代码

case class clickZap ( date: LocalDateTime, stbId:String, channelId :Int , nozap:Boolean)
val afterLastz: DataSet[clickZap]= ... 

val ma_range: IndexedSeq[DataSet[(Int, Option[(java.time.LocalDateTime, String, Int, Boolean)])]]  = for (i  <- Range (0,min_n))
      yield afterLastz.reduceGroup(it =>(i, maxBeforezTCZ(it,at plusMinutes(i))))
//val ma_all =  ma_range.slice(1, min_n).foldLeft(ma_range.head)(_ union _)
val ma_all = ma_range.head union(ma_range.tail.head)

我得到的是一个

Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Cannot union inputs of different types. Input1=scala.Tuple2(_1: Integer, _2: Option[scala.Tuple4(_1: GenericType [java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)]), input2=scala.Tuple2(_1: Integer, _2: Option[scala.Tuple4(_1: GenericType[java.time.LocalDateTime], _2: String, _3: Integer, _4: Boolean)])

我错过了什么?类型没有不同,不是吗?联合运算符应该很便宜,因此规避这个问题似乎没有吸引力。 我提供了前两行代码作为参数,表明数据集中的数据类型是相同的。 我使用的是flink版本0.9.0和0.9.1

最佳答案

问题是 Flink 自己的打字系统中的一个错误。表示 Scala Option 的 OptionTypeInfo 未定义正确的 equals 方法。因此,未检测到两个 OptionTypeInfos 相等。

我创建了一个JIRA issue并开通了Pull Request解决问题。拉取请求应在两天内合并。如果您随后使用最新的 0.10-SNAPSHOT 版本,那么您的问题应该得到解决。

关于scala - apache flink 的 union 类型混淆?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32399619/

相关文章:

java 7 uuid 错误?

java - 使用 Apache Flink 处理 XML

apache-flink - 对随机数源中的数字求和

scala - 在Scala中使用self别名=>

scala - 将 Spark 流数据写入并附加到 HDFS 中的文本文件

sql - 尝试运行此 SQL 脚本时出现以下错误 : null,

postgresql - Slick 3批更新

java - Flink TaskManager 未重新连接到新的 Jobmanager

serialization - Flink流: Unexpected charaters in serialized String messages

java - Apache 弗林克 : Ordered timestamps with parallelism