python - pyspark 聚合,同时找到组的第一个值

标签 python apache-spark pyspark apache-spark-sql

假设我有 5 TB 具有以下架构的数据,并且我正在使用 Pyspark。

| id | date | Month | KPI_1 | ... | KPI_n

对于 90% 的 KPI,我只需要知道总和/最小值/最大值聚合到 (id, Month) 级别。对于剩下的 10%,我需要知道基于日期的第一个值。

我的一个选择是使用 window。例如,我可以这样做

from pyspark.sql import Window
import pyspark.sql.functions as F

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))

# for the 90% kpi
agg_df = df.withColumn("kpi_1", F.sum("kpi_1").over(w))
agg_df = agg_df.withColumn("kpi_2", F.max("kpi_2").over(w))
agg_df = agg_df.withColumn("kpi_3", F.min("kpi_3").over(w))
...

# Select last row for each window to get last accumulated sum for 90% kpis and last value for 10% kpi (which is equivalent to first value if ranked ascending). 

# continue process agg_df with filters based on sum/max/min values of 90% KIPs. 

但我不确定如何选择每个窗口的最后一行。有没有人有任何建议,或者是否有更好的聚合方法?

最佳答案

假设我们有这些数据

+---+----------+-------+-----+-----+
| id|      date|  month|kpi_1|kpi_2|
+---+----------+-------+-----+-----+
|  1|2000-01-01|2000-01|    1|  100|
|  1|2000-01-02|2000-01|    2|  200|
|  1|2000-01-03|2000-01|    3|  300|
|  1|2000-01-04|2000-01|    4|  400|
|  1|2000-01-05|2000-01|    5|  500|
|  1|2000-02-01|2000-02|   10|   11|
|  1|2000-02-02|2000-02|   20|   21|
|  1|2000-02-03|2000-02|   30|   31|
|  1|2000-02-04|2000-02|   40|   41|
+---+----------+-------+-----+-----+

我们要计算 kpi_1 的最小值、最大值和总和,并得到每个组的 kpi_2 的最后一个值。

通过idmonth对数据进行分组,可以得到kpi_1的最小值、最大值和总和。使用 Spark >= 3.0.0 max_by可用于获取kpi_2的最新值:

df_avg = df \
    .groupBy("id","month") \
    .agg(F.sum("kpi_1"), F.min("kpi_1"), F.max("kpi_1"), F.expr("max_by(kpi_2, date)"))
df_avg.show()

打印

+---+-------+----------+----------+----------+-------------------+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|max_by(kpi_2, date)|
+---+-------+----------+----------+----------+-------------------+
|  1|2000-02|       100|        10|        40|                 41|
|  1|2000-01|        15|         1|         5|                500|
+---+-------+----------+----------+----------+-------------------+

对于 Spark 版本 < 3.0.0 max_by 不可用,因此获取每个组的最后一个 kpi_2 值更加困难。第一个想法可能是使用聚合函数 first()在降序数据帧上。一个简单的测试给了我正确的结果,但不幸的是文档指出“该函数是非确定性的,因为它的结果取决于行的顺序,这在洗牌后可能是非确定性的”

获取 kpi_2 的最后一个值的更好方法是使用问题中显示的窗口。作为窗口函数row_number()会工作:

w = Window.partitionBy("id", "Month").orderBy(F.desc("date"))
df_first = df.withColumn("row_number", F.row_number().over(w)).where("row_number = 1")\
    .drop("row_number") \
    .select("id", "month", "KPI_2")
df_first.show()

打印

+---+-------+-----+
| id|  month|KPI_2|
+---+-------+-----+
|  1|2000-02|   41|
|  1|2000-01|  500|
+---+-------+-----+

加入第一部分(没有 max_by 列)和第二部分给出所需的结果:

df_result = df_avg.join(df_first, ['id', 'month'])
df_result.show()

打印

+---+-------+----------+----------+----------+-----+
| id|  month|sum(kpi_1)|min(kpi_1)|max(kpi_1)|KPI_2|
+---+-------+----------+----------+----------+-----+
|  1|2000-02|       100|        10|        40|   41|
|  1|2000-01|        15|         1|         5|  500|
+---+-------+----------+----------+----------+-----+

关于python - pyspark 聚合,同时找到组的第一个值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62181157/

相关文章:

python - 写一个时如何限制文件大小?

python - SQLAlchemy:具有复合主键的同一个表的多个外键

scala - 如何在 PySpark 中添加自定义 JDBC 方言

python - 如何展平在 PySpark 中使用 zip 转换创建的元组

hadoop - 有什么方法可以通过一个pyspark脚本从10个不同的模式中提取数据?

python - 多处理池map_async的意外行为

python - 如何在条件下在pyspark上创建新列?

scala - 如何读取压缩的 Spark eventLog?

scala - 如何在 Apache Spark 1.0 中构建大型分布式 [稀疏] 矩阵?

apache-spark - PySpark动态创建StructType