apache-spark - Spark Parquet 装载机 : Reduce number of jobs involved in listing a dataframe's files

标签 apache-spark pyspark

我正在通过

将 Parquet 数据加载到数据框中
spark.read.parquet('hdfs:///path/goes/here/...')

由于 parquet 分区,该路径中约有 50k 个文件。当我运行该命令时,spark 会生成数十个小作业,这些作业总体上需要几分钟才能完成。以下是 Spark UI 中作业的样子:

enter image description here

正如您所看到的,虽然每个作业都有大约 2100 个任务,但它们执行速度很快,大约需要 2 秒。启动如此多的“迷你作业”效率很低,导致此文件列出步骤需要大约 10 分钟(其中集群资源大部分处于空闲状态,并且集群主要处理分散的任务或管理作业/任务的开销)。

如何将这些任务合并为更少的作业,而每个作业又包含更多的任务? 也适用于 pyspark 的解决方案的奖励积分。

我正在 hadoop 2.8.3 上通过 pyspark 运行 Spark 2.2.1。

最佳答案

我相信您遇到了一个错误,我的一位前同事已为此提交了票证并提出了拉取请求。您可以查看here 。如果它适合您的问题,您最好的办法可能是对该问题进行投票并在邮件列表上发出一些关于它的声音。

您可能想要做的是调整 spark.sql.sources.parallelPartitionDiscovery.thresholdspark.sql.sources.parallelPartitionDiscovery.parallelism 配置参数(使用前者在链接票证中被引用)以适合您工作的方式。

您可以看看herehere查看如何使用配置 key 。为了完整起见,我将在此处分享相关片段。

spark.sql.sources.parallelPartitionDiscovery.threshold

// Short-circuits parallel listing when serial listing is likely to be faster.
if (paths.size <= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
  return paths.map { path =>
    (path, listLeafFiles(path, hadoopConf, filter, Some(sparkSession)))
  }
}

spark.sql.sources.parallelPartitionDiscovery.parallelism

// Set the number of parallelism to prevent following file listing from generating many tasks
// in case of large #defaultParallelism.
val numParallelism = Math.min(paths.size, parallelPartitionDiscoveryParallelism)

此配置的阈值默认值为 32,并行度默认值为 10000(相关代码 here )。

<小时/>

就您而言,我想说您可能想要做的是设置阈值,以便进程在不生成并行作业的情况下运行。

注意

链接的源来自撰写本文时最新的可用标记版本 2.3.0。

关于apache-spark - Spark Parquet 装载机 : Reduce number of jobs involved in listing a dataframe's files,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49133228/

相关文章:

apache-spark - 根据 Spark 中的条件获取行索引

apache-spark - 重新启动 Spark 结构化流作业会消耗数百万条 Kafka 消息并死掉

python - 如何在pyspark数据框中转换 "DD/MM/YYYY"格式的日期?

apache-spark - PySpark 删除所有特殊字符的所有列名中的特殊字符

python - 在一次操作中使用 spark 通过 reduceByKey 查找值范围

apache-spark - spark数据帧中过滤器的多个条件

hadoop - 使用Hadoop工具集匹配地理位置

scala - 当数据尚未加载时不可能获得工件。 IvyNode = org.scala-lang#scala-library;2.10.3

python - pyspark 上使用 Spark 的代码

apache-spark - 在 Spark 中压缩序列文件?