scala - 在 Apache Flink 中从输入文件创建对象

标签 scala apache-flink

我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对于数据分析很重要。

数据集的结构:

folder1
   +-----file11
            +-----column1
            +-----column2

每个文件都包含描述一个对象的数据。文件的格式是一致的。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。

文件的大小非常小。仅最多 20 kb。每个文件夹包含大约 200 个文件。

所需的输出对象应该是:

{
    a: "folder1",              // name of parent folder
    b: "file11",               // name of content file
    c: Seq[(String, String)]   // content of file1
}

如何在 Scala 中处理该数据集的读取?

最佳答案

有两种方法可以解决这个问题:

a) 如果文件夹中的数据非常小(小于几兆字节),您可以在本地进行读取并使用 ExecutionEnvironment.fromCollection() 方法将数据带入 Flink工作。

b) 您创建一个自定义输入格式。 InputFormat 允许解析自定义文件格式。在您的情况下,我将扩展 TextInputFormat 并覆盖 readRecord() 方法。此方法将文件中的每一行作为字符串提供。 然后,您可以手动解析 String 中的数据,并将解析结果与 Tuple3 中的目录信息一起返回。您可以从 filePath 变量访问该路径。 对于使用 FileInputFormat 递归读取文件,有 recursive.file.enumeration 配置值。

关于scala - 在 Apache Flink 中从输入文件创建对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30599616/

相关文章:

java - 使用单线程 ForkJoinPool 在 Future 内运行 Future.map

scala - 在云中部署和托管 scala?

mysql - Scala 异常检索插入的 id

scala - 为什么我的嵌套伴生对象的成员在类中不自动可见?

scala - 什么是基于规则验证的 Scala 惯用方法?

amazon-web-services - 无法使用 StreamExecutionEnvironment 使用 S3 接收器写入 S3 - Apache Flink 1.1.4

apache-flink - Apache Flink 中的不可序列化对象

java - Maven 从 flink 项目创建两个 jar,其中一个仅 80mb 在集群中工作

java - Flink 维护配置状态

logging - 如何在 Flink 作业执行期间记录未捕获的异常