scala - 在 Spark 中发现和读取多个文件

标签 scala apache-spark

有不同的系统,它们有不同的文件集(txt,csv)要加载和转换并写入输出文件
使用 Apache Spark/Scala。
假设 SystemA 有 3 个文件,SystemB 在各自的目录中有 2 个文件。

FileType       |FileNames
-----------------------------------------
Customer       |Customer_20190301.csv
Account        |Account_20190301.csv
Order          |Order_20190301.csv
OrderDetails   |OrderDetails_20190301.txt
Transactions   |Transactions_20190301.txt

现在我想根据作为输入给出的系统名称获取文件名和路径,以便我可以加载它们各自的系统文件。
我不想为每个系统创建单独的程序并加载它们的文件,因为文件名或路径将来可能会改变。

有没有一种有效的方法来处理这个问题?使用配置文件?
或者可能正在使用或不使用任何外部库?请指导我。

最佳答案

这个问题很适合采用分而治之的方法:

  • 描述系统的数量 + 参数化进一步处理所需的任何参数。你如何做到这一点取决于你的部署环境、选择的语言等。没有一个正确的答案。
  • 将(1)中的信息读入数据结构。
  • 使用(2)和(递归)目录列表的某种组合生成要处理的文件列表。请注意,给定路径,您可以使用 FileSystem.get(new java.net.URI(path), new Configuration()) 在 Spark 中获取 Hadoop 文件系统。 .
  • 按类型对文件进行分组。
  • 对于每个组,参数化一个 DataFrameReader来自 spark.read适本地并使用 .load(paths: _*) 调用负载的多路径版本.您可以通过将组名映射到返回 DataFrameReader 的函数来概括此代码。 .

  • 以下是如何执行 (5) 的示例:
    val readers: Map[String, SparkSession => DataFrameReader] = Map(
      "customer" -> ((spark: SparkSession) => spark.read.option("format", "csv"))
    )
    
    val groups: Map[String, Seq[String]] = ???
    
    groups.map { case (groupName, paths) =>
      readers(groupName)(spark).load(paths: _*)
    }
    

    关于scala - 在 Spark 中发现和读取多个文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55203493/

    相关文章:

    scala - Apache-Spark 内部作业调度

    scala - 如何基于列值是否在Spark DataFrame的一组字符串中来过滤行

    java - Spark/Scala - 从 Json 创建 DataFrame 时出错 : java. lang.NoSuchMethodError : org. apache.spark.sql.DataFrameReader.json

    scala - 如何将 hiveContext 作为参数传递给函数 spark scala

    postgresql - Play Framework 中带有 Postgres 的 Slick 代码生成器

    performance - DStream 的分区(用于 updateStateByKey() )如何工作以及如何验证它?

    apache-spark - 如何使用pyspark对数据框中的两列进行数学运算

    python - PySpark:如何在 rdd join 期间从左表中选择 *

    scala - 尝试在 IntelliJ 中添加 Spark 依赖项时出现 OpenJDK Server VM 和 Unresolved 依赖项警告

    scala - 为什么抽象覆盖失败