scala - 如何在scala中使用flink折叠功能

标签 scala apache-flink

这是使用 Flink Fold 与 scala 匿名函数的无效尝试:

val myFoldFunction = (x: Double, t:(Double,String,String)) => x + t._1
env.readFileStream(...).
...
.groupBy(1)
.fold(0.0, myFoldFunction : Function2[Double, (Double,String,String), Double])

它编译得很好,但在执行时,我遇到了“类型删除问题”(见下文)。在 Java 中这样做很好,但当然更冗长。我喜欢简洁明了的 lambda。我怎样才能在scala中做到这一点?

Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
Type of TypeVariable 'R' in 'public org.apache.flink.streaming.api.scala.DataStream org.apache.flink.streaming.api.scala.DataStream.fold(java.lang.Object,scala.Function2,org.apache.flink.api.common.typeinfo.TypeInformation,scala.reflect.ClassTag)' could not be determined. 
This is most likely a type erasure problem. 
The type extraction currently supports types with generic variables only in cases where all variables in the return type can be deduced from the input type(s).

最佳答案

您遇到的问题是Flink [1]中的一个错误。该问题源于 Flink 的 TypeExtractor 以及 Scala DataStream API 在 Java 实现之上的实现方式。 TypeExtractor 无法为 Scala 类型生成 TypeInformation,因此返回 MissingTypeInformation。此缺失的类型信息是在创建 StreamFold 运算符后手动设置的。但是,StreamFold 运算符的实现方式不接受 MissingTypeInformation,因此在设置正确的类型信息之前会失败。

我已打开拉取请求 [2] 来解决此问题。应该会在接下来的两天内合并。通过使用最新的 0.10 快照版本,您的问题应该得到解决。

关于scala - 如何在scala中使用flink折叠功能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32378765/

相关文章:

java - 适用于每个输入

apache-flink - 如何阻止高负载导致级联 Flink 检查点故障

scala - Haskell 序列的模拟

使用即发即忘 Futures 进行 Scala 测试

apache-flink - 为什么在申请任务管理器时 '-n' 或 '-yn' 之类的参数不起作用

apache-flink - 如何处理 Apache Flink 中的 transient /应用程序故障?

spring - 使用 Scala 规范对 Spring 应用程序进行单元测试

scala - 为什么 spark 排序比 scala 原始排序方法慢

scala - Deequ 检查的结果数据帧的列有何含义?

java - 弗林克 : How to pass a dynamic path while writing to files using writeFileAsText(path)?