我想分组聚合一个 pyspark 数据帧,同时根据此数据框的另一列删除重复项(保留最后一个值) .
总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地保留某一列的一行。
例子
对于下面的数据帧,直接的组聚合将是:
from pyspark.sql import functions
dataframe = spark.createDataFrame(
[
(1, "2020-01-01", 1, 1),
(2, "2020-01-01", 2, 1),
(3, "2020-01-02", 1, 1),
(2, "2020-01-02", 1, 1)
],
("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))
# +---+-------------------+-------+---+
# | id| ts|feature| h3|
# +---+-------------------+-------+---+
# | 1|2020-01-01 00:00:00| 1| 1|
# | 2|2020-01-01 00:00:00| 2| 1|
# | 3|2020-01-02 00:00:00| 1| 1|
# | 2|2020-01-02 00:00:00| 1| 1|
# +---+-------------------+-------+---+
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sum("feature")
)
aggregated.show(truncate=False)
导致以下数据帧:
+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
问题
我希望聚合仅使用 最新状态每个
id
.在这种情况下,id=2
已更新为 feature=1
在 ts=2020-01-02 00:00:00
,所以所有基时间戳大于 2020-01-02 00:00:00
的聚合当 id=2
时,应仅将此状态用于列功能.预期的聚合数据框是:+---+------------------------------------------+------------+
|h3 |window |sum(feature)|
+---+------------------------------------------+------------+
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3 |
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3 |
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3 |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2 |
+---+------------------------------------------+------------+
我怎样才能用 pyspark 做到这一点?
更新
我假设 MapType 变量在 Spark 中不应该有重复的键。有了这个假设,我想我可以聚合列创建 map
id -> feature
然后只用 sum (或最终聚合应该是什么)聚合 map 值。所以我做了:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.map_from_entries(
functions.collect_list(
functions.struct("id","feature")
)
).alias("id_feature")
)
aggregated.show(truncate=False)
但后来我发现 map 可以 有重复的键:
+---+------------------------------------------+--------------------------------+
|h3 |window |id_feature |
+---+------------------------------------------+--------------------------------+
|1 |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1 |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2] |
|1 |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1] |
+---+------------------------------------------+--------------------------------+
所以它没有解决我的问题 .相反,我只是发现了另一个问题。在 Databricks 的笔记本中使用显示功能时,it shows the MapType column without duplicated keys .
最佳答案
由于您使用的是 Spark 2.4+,您可以尝试的一种方法是使用 Spark SQL aggregate功能,见下图:
aggregated = dataframe.groupby("h3",
functions.window(
timeColumn="ts",
windowDuration="3 days",
slideDuration="1 day",
)
).agg(
functions.sort_array(functions.collect_list(
functions.struct("ts", "id", "feature")
), False).alias("id_feature")
)
我加了
ts
字段到来自functions.collect_list的结构体数组。使用functions.sort_array按ts
对列表进行排序按降序排列(如果存在重复,则保留最新记录)。在下面的聚合函数中,我们使用包含两个字段的named_struct 设置zero_value:ids (MapType) 缓存所有处理过的id 和total 仅在缓存中不存在新id 时进行求和ids
.aggregated.selectExpr("h3", "window", """
aggregate(
id_feature,
/* zero_value */
(map() as ids, 0L as total),
/* merge */
(acc, y) -> named_struct(
/* add y.id into the ids map */
'ids', map_concat(acc.ids, map(y.id,1)),
/* sum to total only when y.id doesn't exist in acc.ids map */
'total', acc.total + IF(acc.ids[y.id] is null,y.feature,0)
),
/* finish, take only acc.total, discard acc.ids map */
acc -> acc.total
) as id_features
""").show()
+---+--------------------+----------+
| h3| window|id_feature|
+---+--------------------+----------+
| 1|[2020-01-01 00:00...| 3|
| 1|[2019-12-31 00:00...| 3|
| 1|[2019-12-30 00:00...| 3|
| 1|[2020-01-02 00:00...| 2|
+---+--------------------+----------+
关于dataframe - 在 pyspark 中删除重复项时进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60608544/