scala - 弗林克 : How to write DataSet to a variable instead of to a file

标签 scala apache-flink

我有一个使用 DataSet API 用 scala 编写的 flink 批处理程序,它产生了我感兴趣的最终数据集。我想将该数据集作为变量或值(例如字符串列表或序列)在我的程序,而不必将其写入任何文件。可能吗?

我已经看到 flink 允许收集数据接收器以便进行调试(他们文档中的唯一示例是 Java)。但是,这只允许在本地执行,而且我不知道它在 Scala 中的等价物。我想要的是在对程序值或变量完成整个 flink 并行执行后写入最终结果数据集。

最佳答案

首先,尝试使用 Scala 版本的集合数据接收器: 导入 org.apache.flink.api.scala._ 导入 org.apache.flink.api.java.io.LocalCollectionOutputFormat;

 .
 .
val env = ExecutionEnvironment.getExecutionEnvironment

// Create a DataSet from a list of elements
val words = env.fromElements("w1","w2", "w3")

var outData:java.util.List[String]= new java.util.ArrayList[String]()
words.output(new LocalCollectionOutputFormat(outData))

// execute program
env.execute("Flink Batch Scala")
println(outData)

其次,如果你的数据集适合单机内存,为什么你需要使用分布式处理框架?我认为您应该更多地考虑您的用例!并尝试使用正确的transformations在你的数据集上。

关于scala - 弗林克 : How to write DataSet to a variable instead of to a file,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46971444/

相关文章:

scala - Flink 通用 Avro 解串器 : override getProducedType

apache-flink - Apache Flink 检查点卡住

hadoop-yarn - 当 flink 作业失败时,Yarn 报告 flink 作业为 FINISHED 和 SUCCEED

scala - 使用 DataFrame API 时,自联接无法按预期工作

多个隐式参数的Scala解析

scala - 具有类型参数限制的 Scala 泛型类的条件方法

java - 如何实现 FlinkKafkaPartitioner?

apache-flink - 如何处理 Apache Flink 中很少更新的大型查找表

scala - 创建RDD时spark报错RDD类型未找到

java - 在 Play Framework 中跨 Controller 和 View 维护请求参数