apache-spark - Pyspark 作业陷入最终任务

标签 apache-spark pyspark apache-spark-sql hadoop-yarn

我的程序流程是这样的:

1. 将 40 亿行 (~700GB) 数据从 parquet 文件读取到数据帧中。使用的分区大小为 2296

2.清理并过滤掉25亿行

3. 使用管道模型和经过训练的模型转换剩余的 15 亿行。该模型使用逻辑回归模型进行训练,其中预测 0 或 1,并且 30% 的数据从转换后的数据框中过滤掉。

4. 上述数据框与另一个约 1 TB 的数据集进行左外连接(也从 parquet 文件读取)。分区大小为 4000

5. 将其与另一个大约 100 MB 的数据集连接起来,例如

joined_data = data1.join(broadcast(small_dataset_100MB), data1.field ==small_dataset_100MB.field, "left_outer")

6. 然后将上述数据框分解为约 2000 倍

exploded_data = join_data.withColumn('field',explode('field_list'))

7. 执行聚合

aggregate =exploded_data.groupBy(*cols_to_select)\ .agg(F.countDistinct(exploded_data.field1).alias('distincts'), F.count("*").alias('count_all'))

有一个cols_to_select 列表中共有 10 列。

8. 最后执行一个操作,aggregate.count()

问题是,倒数第三个计数阶段(200 个任务)永远卡在任务 199 处。尽管分配了 4 个核心和 56 个执行器,但计数仅使用 1 个核心和 1 个执行器来运行作业。我尝试将大小从 40 亿行分解为 7 亿行,即 1/6 部分,花了四个小时。我非常感谢您提供有关如何加快此过程的帮助,谢谢

最佳答案

由于将倾斜的数据加入到庞大的数据集中,该操作陷入了最终任务。连接两个数据帧的 key 严重倾斜。通过从数据框中删除倾斜的数据,该问题暂时得到解决。如果必须包含倾斜数据,则可以使用迭代广播连接 ( https://github.com/godatadriven/iterative-broadcast-join )。观看此内容丰富的视频以了解更多详细信息 https://www.youtube.com/watch?v=6zg7NTw-kTQ

关于apache-spark - Pyspark 作业陷入最终任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47822036/

相关文章:

hadoop - 通过在随机播放之前在每个分区上运行reduce函数,可以优化Spark ReduceByKey函数吗?

apache-spark - 在 Pyspark 中使用 maxBytesPerTrigger 的正确方法是什么?

excel - inferSchema using spark.read.format ("com.crealytics.spark.excel")正在为日期类型列推断 double

apache-spark - 在独立集群中的 Spark 中加载文件

python - PySpark:org.apache.spark.sql.AnalysisException:属性名称...包含 ",;{}()\n\t="中的无效字符。请使用别名重命名它

java - 使用 withColumn 和 callUDF 将列附加到数据框

java - 如何在java中展平spark数据集中的包装数组

java - AWS EMR 5.20 和 Java 版本支持

python - 使用 PySpark 将 JSON 文件读取为 Pyspark Dataframe?

python - 使用 Windows 的 PySpark 多列