python - 将 cache() 和 count() 应用于 Databricks 中的 Spark Dataframe 非常慢 [pyspark]

标签 python apache-spark pyspark azure-databricks

我在 Databricks 集群中有一个包含 500 万行的 spark 数据框。我想要的是缓存这个 spark 数据帧,然后应用 .count() 以便下一个操作运行得非常快。我过去曾用 20,000 行完成过它并且它有效。然而,在尝试这样做时,我遇到了以下悖论:

数据框创建

第 1 步:从 Azure Data Lake 存储帐户读取 800 万行

read_avro_data=spark.read.format("avro").load(list_of_paths) #list_of_paths[0]='abfss://storage_container_name@storage_account_name.dfs.core.windows.net/folder_1/folder_2/0/2020/06/02/00/00/27.avro'
avro_decoded=read_avro_data.withColumn('Body_decoded', sql_function.decode(read_avro_data.Body, charset="UTF-8")).select("Body_decoded")
datalake_spark_dataframe=datalake_spark_dataframe.union(avro_decoded.withColumn("Body_decoded", sql_function.from_json("Body_decoded", schema)).select(*['Body_decoded.{}'.format(x) for x in columns_selected]))

datalake_spark_dataframe.printSchema()
"root
 |-- id: string (nullable = true)
 |-- BatteryPercentage: float (nullable = true)
 |-- SensorConnected: integer (nullable = false)
 |-- TemperatureOutside: float (nullable = true)
 |-- ReceivedOn: string (nullable = true)"

datalake_spark_dataframe.rdd.getNumPartitions() # 635 partitions

此数据框有 800 万行。我的应用程序有 800 万行,运行良好,但我想在大数据环境中对我的应用程序进行压力测试。因为 800 万行不是大数据。因此,我将 800 万行 Spark Dataframe 复制了 287 次 ~ 22 亿行。为了进行复制,我执行了以下操作:

第 2 步:复制 800 万行数据框

datalake_spark_dataframe_new=datalake_spark_dataframe
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=datalake_spark_dataframe_new.union(datalake_spark_dataframe)
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new.rdd.getNumPartitions() #182880

有了最后的 22 亿行数据框,我为我的数据创建了一个时间窗口 GroupBy,最终得到了数百万行。我在问题的顶部写了大约分组数据集有 500 万行。

第 3 步:按 6 小时的时间窗口对 22 亿行数据帧进行分组并应用 .cache() 和 .count()

%sql set spark.sql.shuffle.partitions=100
import pyspark.sql.functions as sql_function
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, BooleanType, DateType, DoubleType, ArrayType

datalake_spark_dataframe_downsampled=datalake_spark_dataframe_new.withColumn(timestamp_column, sql_function.to_timestamp(timestamp_column, "yyyy-MM-dd HH:mm"))
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.groupBy("id", sql_function.window("ReceivedOn","{0} minutes".format(time_interval)))\
                                                                         .agg(
                                                                              sql_function.mean("BatteryPercentage").alias("BatteryPercentage"),
                                                                              sql_function.mean("SensorConnected").alias("OuterSensorConnected"),
                                                                              sql_function.mean("TemperatureOutside").alias("averageTemperatureOutside"))

columns_to_drop=['window']
datalake_spark_dataframe_downsampled=datalake_spark_dataframe_downsampled.drop(*columns_to_drop)

# From 2.2 billion rows down to 5 million rows after the GroupBy...
datalake_spark_dataframe_downsampled.repartition(100)
datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count() # job execution takes for ever

datalake_spark_dataframe_downsampled.rdd.getNumPartitions() #100 after re-partition

显示 .count() 之前的 Spark UI enter image description here

计数执行期间的 Spark UI enter image description here

当我将以下命令应用于我的 spark Dataframe 时,它​​需要 3 个多小时才能完成此任务,但最终失败了。

我想补充一点,在重新分区之前和之后,作业在时间执行上具有相同的行为。因此,我进行了重新分区,以防默认值使作业运行非常缓慢。因此,我一直在添加分区,以防作业执行得更快。

%sql set spark.sql.shuffle.partitions=1000000
datalake_spark_dataframe_downsampled.repartition(1000000)

datalake_spark_dataframe_downsampled.cache()
datalake_spark_dataframe_downsampled.count()

下面是 spark 作业的输出: enter image description here

我得到的错误:

enter image description here

我的集群资源:

enter image description here

如您所见,这不是 RAM 或 CPU 核心的问题,因为我有很多。 为什么即使在我应用重新分区后作业也只拆分到 5 个阶段?以及如何拆分作业,以便 .cache() 和 .count() 命令根据我的 48 个 vCPU 内核运行得更快?


每个作业执行提供的屏幕截图 在 8000 万行上执行(8m * 10 次迭代 = 80m 行)

enter image description here

最佳答案

我过去在遍历 for 循环时遇到过类似的问题,因为我的迭代是动态的,具体取决于输入组合。

我通过在每次迭代中持久化数据(您可以尝试在 ADLS2 中持久化,或者如果在 On-Prem 中则在 HDFS/Hive 表中持久化)解决了性能问题。在下一次迭代中,再次从该位置读取,联合并再次覆盖相同的位置。网络滞后,效率不高。尽管如此,它还是将执行时间缩短了 10 倍。

可能的原因可能是 Spark Lineage(我相信对于每次迭代,它都会一次又一次地执行所有先前的迭代)。使用覆盖持久化数据可以避免这种情况。 我也尝试了 cache() 和其他选项,但没有帮助我。

编辑#1 尝试这样的事情

datalake_spark_dataframe_new=datalake_spark_dataframe
datalake_spark_dataframe.write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
for i in range(287):
  print(i)
  datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")
  datalake_spark_dataframe_new.union(datalake_spark_dataframe).write.mode("overwrite").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

编辑#2 这应该比以前的版本更有效率,

for i in range(287):
  print(i)
  datalake_spark_dataframe.write.mode("append").option("header", "true").format("parquet").save("abfss://<ADLS_PATH>")
  print("done on iteration: {0}".format(i))

datalake_spark_dataframe_new=spark.read.parquet("abfss://<ADLS_PATH>")

关于python - 将 cache() 和 count() 应用于 Databricks 中的 Spark Dataframe 非常慢 [pyspark],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62140588/

相关文章:

python - 按列表中第一项分组的列表列表中元素的平均值

python - 在 pyomo 内部调用 scipy.optimize

scala - 如何对 spark 上数据帧上的数据进行非规范化

apache-spark - 如何在 Spark 数据帧上训练神经网络自动编码器 (Keras)

hive - 在pyspark中查询HIVE表

python - 从存储在Python变量中的数据播放声音

python - 检查文本在 python 中是否右对齐

python - 在 YARN 上运行 Spark 作业

apache-spark - PySpark 在广泛的列中获得不同的值

python - AWS Glue 截断 Redshift 表