我的程序流程是这样的:
1. 将 40 亿行 (~700GB) 数据从 parquet 文件读取到数据帧中。使用的分区大小为 2296
2.清理并过滤掉25亿行
3. 使用管道模型和经过训练的模型转换剩余的 15 亿行。该模型使用逻辑回归模型进行训练,其中预测 0 或 1,并且 30% 的数据从转换后的数据框中过滤掉。
4. 上述数据框与另一个约 1 TB 的数据集进行左外连接(也从 parquet 文件读取)。分区大小为 4000
5. 将其与另一个大约 100 MB 的数据集连接起来,例如
joined_data = data1.join(broadcast(small_dataset_100MB), data1.field ==small_dataset_100MB.field, "left_outer")
6. 然后将上述数据框分解为约 2000 倍exploded_data = join_data.withColumn('field',explode('field_list'))
7. 执行聚合aggregate =exploded_data.groupBy(*cols_to_select)\
.agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))
有一个cols_to_select
列表中共有 10 列。
8. 最后执行一个操作,aggregate.count()
。
问题是,倒数第三个计数阶段(200 个任务)永远卡在任务 199 处。尽管分配了 4 个核心和 56 个执行器,但计数仅使用 1 个核心和 1 个执行器来运行作业。我尝试将大小从 40 亿行分解为 7 亿行,即 1/6 部分,花了四个小时。我非常感谢您提供有关如何加快此过程的帮助,谢谢
最佳答案
由于将倾斜的数据加入到庞大的数据集中,该操作陷入了最终任务。连接两个数据帧的 key 严重倾斜。通过从数据框中删除倾斜的数据,该问题暂时得到解决。如果必须包含倾斜数据,则可以使用迭代广播连接 ( https://github.com/godatadriven/iterative-broadcast-join )。观看此内容丰富的视频以了解更多详细信息 https://www.youtube.com/watch?v=6zg7NTw-kTQ
关于apache-spark - Pyspark 作业陷入最终任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47822036/