背景:在 Hadoop Streaming 中,每个 reduce 作业在完成时都会写入 hdfs,从而为 Hadoop 集群执行下一个 reduce 扫清道路。
我无法将此范例映射到 (Py)Spark。
举个例子,
df = spark.read.load('path')
df.rdd.reduceByKey(my_func).toDF().write.save('output_path')
当我运行它时,集群会在将任何内容写入磁盘之前收集数据框中的所有数据。至少这就是我观察工作进展时正在发生的事情。
我的问题是我的数据比我的集群内存大得多,所以我在写入任何数据之前就用完了内存。在 Hadoop Streaming 中,我们没有这个问题,因为输出数据被流式传输到磁盘以为后续批处理的数据腾出空间。
我考虑过这样的事情:
for i in range(100):
(df.filter(df.loop_index==i)
.rdd
.reduceByKey(my_func)
.toDF()
.write.mode('append')
.save('output_path'))
我在每次迭代中只处理一部分数据。但这看起来很笨拙,主要是因为我必须坚持 df
,由于内存限制这是不可能的,或者我必须在每次迭代中重新读取输入 hdfs 源。
使循环工作的一种方法是按天或数据的某些其他子集对源文件夹进行分区。但为了这个问题,我们假设这是不可能的。
问题:如何在 PySpark 中运行这样的作业?我只需要有一个更大的集群吗?如果是这样,在处理数据之前调整集群大小的常见做法是什么?
最佳答案
将数据重新划分为大量分区可能会有所帮助。下面的示例类似于您的 for 循环,尽管您可能想先尝试使用较少的分区
df = spark.read.load('path').repartition(100)
您还应该查看当前使用的执行器数量 (--num-executors
)。减少这个数字也应该减少你的内存占用。
关于python - 如何让 PySpark 在内存不足之前将中间结果写入磁盘?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42849609/