apache-spark - Spark 2.3 是否改变了它处理小文件的方式?

标签 apache-spark pyspark bigdata apache-spark-2.0

我刚开始使用 Spark 2+(2.3 版本),在查看 Spark UI 时发现了一些奇怪的东西。
我在 HDFS 集群中有一个目录列表,总共包含 24000 个小文件。

当我想对它们运行 Spark 操作时,Spark 1.5 会为每个输入文件生成一个单独的任务,就像我之前一直使用的那样。我知道每个 HDFS 块(在我的例子中一个小文件是一个块)在 Spark 中生成一个分区,每个分区由一个单独的任务处理。

Spark 1.5 UI screenshot

此外,命令 my_dataframe.rdd.getNumPartitions()输出 24000。

现在关于 Spark 2.3
在同一输入上,命令 my_dataframe.rdd.getNumPartitions()输出 1089。Spark UI 还为我的 Spark 操作生成 1089 个任务。您还可以看到,spark 2.3 中生成的工作数量大于 1.5

Spark 2.3 UI screenshot

两个 Spark 版本的代码相同(我需要稍微更改数据框、路径和列名,因为它是工作代码):

%pyspark
dataframe = sqlContext.\
                read.\
                parquet(path_to_my_files)
dataframe.rdd.getNumPartitions()
dataframe.\
    where((col("col1") == 21379051) & (col("col2") == 2281643649) & (col("col3") == 229939942)).\
    select("col1", "col2", "col3").\
    show(100, False)

这是生成的物理计划
dataframe.where(...).select(...).explain(True)

Spark 1.5
== Physical Plan ==
Filter (((col1 = 21379051) && (col2 = 2281643649)) && (col3 = 229939942))
 Scan ParquetRelation[hdfs://cluster1ns/path_to_file][col1#27,col2#29L,col3#30L]
Code Generation: true

Spark 2.3
== Physical Plan ==
*(1) Project [col1#0, col2#2L, col3#3L]
+- *(1) Filter (((isnotnull(col1#0) && (col1#0 = 21383478)) && (col2 = 2281643641)) && (col3 = 229979603))
   +- *(1) FileScan parquet [col1,col2,col3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://cluster1ns/path_to_file..., PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:bigint,col2:bigint,col3:bigint>....

以上作业是使用 pyspark 从 zeppelin 生成的。
还有其他人在 spark 2.3 中遇到过这种情况吗?
我不得不说我喜欢处理多个小文件的新方法,但我也想了解可能的内部 Spark 更改。

我在互联网上搜索了最新的书“Spark 权威指南”,但没有找到任何关于 Spark 生成工裁剪理计划的新方法的信息。

如果您有任何链接或信息,阅读会很有趣。
谢谢!

最佳答案


Spark 2.3 configuration

|spark.files.maxPartitionBytes| 134217728 (128 MB) |读取文件时打包到单个分区的最大字节数。

关于apache-spark - Spark 2.3 是否改变了它处理小文件的方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49531252/

相关文章:

apache-spark - 使用列值作为spark DataFrame函数的参数

apache-spark - 为什么 Zeppelin-Spark 解释器没有挂载 Kubernetes 服务帐户

python - 多处理 RDD 列表

java - pig : UDF not returning expected resultset

hadoop - 使用Hadoop Map reduce处理和拆分大数据?

python - 有什么方法可以在 Spark Dataframe 的组数据上运行 stat 函数交叉表?

python - 将 csv 字典列转换为行 pyspark

hadoop - 连接CDH 5.4中的 yarn 簇上的 Spark

apache-spark - Hive:创建表时将结构体作为映射类型的键

database - ElasticSearch "match_all"慢, "from"> 1M,+10M 文档