scala - Spark DataFrame 并行性

标签 scala hadoop apache-spark apache-spark-sql

下面是我使用 Apache Spark 的用例

1) 我在 HDFS 上有大约 2500 个 Parquet 文件,文件大小因文件而异。

2) 我需要处理每个 parquet 文件并构建一个新的 DataFrame 并将一个新的 DataFrame 写入 orc 文件格式。

3)我的Spark驱动程序是这样的。 我正在迭代每个文件,处理单个 Parquet 文件,创建一个新的 DataFrame 并将一个新的 DataFrame 编写为 ORC,下面是代码片段。

  val fs = FileSystem.get(new Configuration())
  val parquetDFMap = fs.listStatus(new Path(inputFilePath)).map(folder => {
  (folder.getPath.toString, sqlContext.read.parquet(folder.getPath.toString))})

parquetDFMap.foreach {
  dfMap =>
    val parquetFileName = dfMap._1
    val parqFileDataFrame = dfMap._2
    for (column <- parqFileDataFrame.columns) 
    {
       val rows = parqFileDataFrame.select(column)
            .mapPartitions(lines => lines.filter(filterRowsWithNullValues(_))
            .map(row => buildRowRecords(row, masterStructArr.toArray, valuesArr)))
        val newDataFrame: DataFrame = parqFileDataFrame.sqlContext.createDataFrame(rows, StructType(masterStructArr))
       newDataFrame.write.mode(SaveMode.Append).format("orc").save(orcOutPutFilePath+tableName)
    }
}

这种设计的问题我只能及时处理一个 parquet 文件,只有当我创建一个新的数据帧并且将新的数据帧写入 ORC 格式时才应用并行性。因此,如果创建新的 DataFrame 或将新的 DataFrame 写入 ORC 等任何任务需要很长时间才能完成,其他排队的 parquet 处理就会卡住,直到当前的 parquet 操作完成。

能否请您帮助我为这个用例提供更好的方法或设计。

最佳答案

你能为所有 parquet 文件创建一个数据框而不是为每个文件创建一个数据框吗

val df =  sqlContext.read.parquet(inputFilePath)
df.map(row => convertToORc(row))

关于scala - Spark DataFrame 并行性,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37838950/

相关文章:

斯卡拉 : reassignment to val

Hadoop mapreduce 出现 "Cannot resolve the host name"错误

java - 运行hadoop作业: Class not found - org. apache.tools.ant.launch.AntMain

java - 基于第二个DataFrame的DataFrame过滤

hadoop - 是否可以在 SPARK 中覆盖 Hadoop 配置?

Scala-IDE - 具有不同签名的对象 `apply` 方法混合使用了吗?

Scala - 当依赖类也使用相同的泛型类型时,使用 guice 注入(inject)泛型类型

scala - 在scala中使用“迭代Seq或如果为空”更好的版本?

hadoop - JBoss Drools 与 Apache Hadoop 的集成

java - 在同一 JVM 中检测到多个正在运行的 SparkContext - Java Spark