我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对于数据分析很重要。
数据集的结构:
folder1
+-----file11
+-----column1
+-----column2
每个文件都包含描述一个对象的数据。文件的格式是一致的。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。
文件的大小非常小。仅最多 20 kb。每个文件夹包含大约 200 个文件。
所需的输出对象应该是:
{
a: "folder1", // name of parent folder
b: "file11", // name of content file
c: Seq[(String, String)] // content of file1
}
如何在 Scala 中处理该数据集的读取?
最佳答案
有两种方法可以解决这个问题:
a) 如果文件夹中的数据非常小(小于几兆字节),您可以在本地进行读取并使用 ExecutionEnvironment.fromCollection() 方法将数据带入 Flink工作。
b) 您创建一个自定义输入格式。 InputFormat 允许解析自定义文件格式。在您的情况下,我将扩展 TextInputFormat
并覆盖 readRecord()
方法。此方法将文件中的每一行作为字符串提供。
然后,您可以手动解析 String 中的数据,并将解析结果与 Tuple3 中的目录信息一起返回。您可以从 filePath
变量访问该路径。
对于使用 FileInputFormat
递归读取文件,有 recursive.file.enumeration
配置值。
关于scala - 在 Apache Flink 中从输入文件创建对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30599616/