我刚开始使用 Spark 2+(2.3 版本),在查看 Spark UI 时发现了一些奇怪的东西。
我在 HDFS 集群中有一个目录列表,总共包含 24000 个小文件。
当我想对它们运行 Spark 操作时,Spark 1.5 会为每个输入文件生成一个单独的任务,就像我之前一直使用的那样。我知道每个 HDFS 块(在我的例子中一个小文件是一个块)在 Spark 中生成一个分区,每个分区由一个单独的任务处理。
此外,命令 my_dataframe.rdd.getNumPartitions()
输出 24000。
现在关于 Spark 2.3
在同一输入上,命令 my_dataframe.rdd.getNumPartitions()
输出 1089。Spark UI 还为我的 Spark 操作生成 1089 个任务。您还可以看到,spark 2.3 中生成的工作数量大于 1.5
两个 Spark 版本的代码相同(我需要稍微更改数据框、路径和列名,因为它是工作代码):
%pyspark
dataframe = sqlContext.\
read.\
parquet(path_to_my_files)
dataframe.rdd.getNumPartitions()
dataframe.\
where((col("col1") == 21379051) & (col("col2") == 2281643649) & (col("col3") == 229939942)).\
select("col1", "col2", "col3").\
show(100, False)
这是生成的物理计划
dataframe.where(...).select(...).explain(True)
Spark 1.5
== Physical Plan ==
Filter (((col1 = 21379051) && (col2 = 2281643649)) && (col3 = 229939942))
Scan ParquetRelation[hdfs://cluster1ns/path_to_file][col1#27,col2#29L,col3#30L]
Code Generation: true
Spark 2.3
== Physical Plan ==
*(1) Project [col1#0, col2#2L, col3#3L]
+- *(1) Filter (((isnotnull(col1#0) && (col1#0 = 21383478)) && (col2 = 2281643641)) && (col3 = 229979603))
+- *(1) FileScan parquet [col1,col2,col3] Batched: false, Format: Parquet, Location: InMemoryFileIndex[hdfs://cluster1ns/path_to_file..., PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: struct<col1:bigint,col2:bigint,col3:bigint>....
以上作业是使用 pyspark 从 zeppelin 生成的。
还有其他人在 spark 2.3 中遇到过这种情况吗?
我不得不说我喜欢处理多个小文件的新方法,但我也想了解可能的内部 Spark 更改。
我在互联网上搜索了最新的书“Spark 权威指南”,但没有找到任何关于 Spark 生成工裁剪理计划的新方法的信息。
如果您有任何链接或信息,阅读会很有趣。
谢谢!
最佳答案
从
Spark 2.3 configuration
|spark.files.maxPartitionBytes| 134217728 (128 MB) |读取文件时打包到单个分区的最大字节数。
关于apache-spark - Spark 2.3 是否改变了它处理小文件的方式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49531252/