在写 dataframe
时至 parquet
使用 partitionBy
:
df.write.partitionBy("col1","col2","col3").parquet(path)
我希望每个正在写入的分区都由一个单独的任务独立完成,并且与分配给当前 spark 作业的工作人员数量并行。
然而,在写入 Parquet 时,实际上一次只有一个 worker /任务在运行。那个 worker 正在循环通过每个分区并写出
.parquet
文件串行。为什么会这样 - 有没有办法在这个 spark.write.parquet
中强制并发手术?以下不是我想看到的(应该是
700%+
..)在另一篇文章中,我也尝试添加
repartition
在前Spark parquet partitioning : Large number of files
df.repartition("col1","col2","col3").write.partitionBy("col1","col2","col3").parquet(path)
不幸的是,这没有效果:仍然只有一名 worker ..
注意:我正在运行
local
模式与 local[8]
并且已经看到其他 Spark 操作与多达 8 个并发工作人员一起运行并使用多达 750% 的 CPU。
最佳答案
简而言之,从单个任务写入多个输出文件不是并行化的,但假设您有多个任务(多个输入拆分),每个任务都将在一个 worker 上获得自己的核心。
写出分区数据的目标不是并行化你的写操作。 Spark 已经通过一次同时写出多个任务来做到这一点。目标只是优化 future 的读取操作,您只需要保存数据的一个分区。
Spark 中写入分区的逻辑旨在将前一阶段的所有记录写到目的地时只读取一次。我相信设计选择的一部分也是为了防止出现以下情况
一个分区键有很多很多值。
编辑:Spark 2.x 方法
在 Spark 2.x 中,它按分区键对每个任务中的记录进行排序,然后遍历它们一次写入一个文件句柄。我假设他们这样做是为了确保如果分区键中有很多不同的值,他们永远不会打开大量文件句柄。
作为引用,这里是排序:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121
向下滚动一点,您会看到它在调用 write(iter.next())
循环遍历每一行。
这是实际的写入(一次一个文件/分区键):
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala#L121
在那里你可以看到它一次只打开一个文件句柄。
编辑:Spark 1.x 方法
spark 1.x 对给定任务所做的是循环遍历所有记录,每当遇到新的输出分区时打开一个新的文件句柄,它以前从未见过此任务。然后它立即将记录写入该文件句柄并转到下一个。这意味着在任何给定时间处理单个任务时,最多可以为该任务打开 N 个文件句柄,其中 N 是最大输出分区数。为了更清楚,这里有一些 python 伪代码来展示总体思路:
# To write out records in a single InputSplit/Task
handles = {}
for row in input_split:
partition_path = determine_output_path(row, partition_keys)
if partition_path not in handles:
handles[partition_path] = open(partition_path, 'w')
handles[partition_path].write(row)
上述写出记录的策略有一个警告。在 spark 1.x 中设置
spark.sql.sources.maxConcurrentWrites
对每个任务可以打开的掩码文件句柄设置上限。达到这一点后,Spark 将改为按分区键对数据进行排序,因此它可以遍历记录,一次写出一个文件。
关于scala - 保存到分区的 Parquet 文件时实现并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51050272/