dataframe - 在 pyspark 中删除重复项时进行聚合

标签 dataframe apache-spark pyspark apache-spark-sql databricks

我想分组聚合一个 pyspark 数据帧,同时根据此数据框的另一列删除重复项(保留最后一个值) .

总之,我想将 dropDuplicates 应用于 GroupedData 对象。因此,对于每个组,我只能动态地保留某一列的一行。

例子

对于下面的数据帧,直接的组聚合将是:

from pyspark.sql import functions

dataframe = spark.createDataFrame(
    [
        (1, "2020-01-01", 1, 1),
        (2, "2020-01-01", 2, 1),
        (3, "2020-01-02", 1, 1),
        (2, "2020-01-02", 1, 1)
    ],
    ("id", "ts", "feature", "h3")
).withColumn("ts", functions.col("ts").cast("timestamp"))

# +---+-------------------+-------+---+
# | id|                 ts|feature| h3|
# +---+-------------------+-------+---+
# |  1|2020-01-01 00:00:00|      1|  1|
# |  2|2020-01-01 00:00:00|      2|  1|
# |  3|2020-01-02 00:00:00|      1|  1|
# |  2|2020-01-02 00:00:00|      1|  1|
# +---+-------------------+-------+---+

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.sum("feature")
)
aggregated.show(truncate=False)

导致以下数据帧:
+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|5           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|5           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

问题

我希望聚合仅使用 最新状态每个id .在这种情况下,id=2已更新为 feature=1ts=2020-01-02 00:00:00 ,所以所有基时间戳大于 2020-01-02 00:00:00 的聚合当 id=2 时,应仅将此状态用于列功能.预期的聚合数据框是:
+---+------------------------------------------+------------+
|h3 |window                                    |sum(feature)|
+---+------------------------------------------+------------+
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|3           |
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|3           |
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|3           |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|2           |
+---+------------------------------------------+------------+

我怎样才能用 pyspark 做到这一点?

更新

我假设 MapType 变量在 Spark 中不应该有重复的键。有了这个假设,我想我可以聚合列创建 map id -> feature然后只用 sum (或最终聚合应该是什么)聚合 map 值。

所以我做了:

aggregated = dataframe.groupby("h3",
  functions.window(
    timeColumn="ts",
    windowDuration="3 days",
    slideDuration="1 day",
  )
).agg(
  functions.map_from_entries(
    functions.collect_list(
      functions.struct("id","feature")
    )
  ).alias("id_feature")
)
aggregated.show(truncate=False)

但后来我发现 map 可以 有重复的键:
+---+------------------------------------------+--------------------------------+
|h3 |window                                    |id_feature                      |
+---+------------------------------------------+--------------------------------+
|1  |[2020-01-01 00:00:00, 2020-01-04 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-31 00:00:00, 2020-01-03 00:00:00]|[1 -> 1, 2 -> 2, 3 -> 1, 2 -> 1]|
|1  |[2019-12-30 00:00:00, 2020-01-02 00:00:00]|[1 -> 1, 2 -> 2]                |
|1  |[2020-01-02 00:00:00, 2020-01-05 00:00:00]|[3 -> 1, 2 -> 1]                |
+---+------------------------------------------+--------------------------------+

所以它没有解决我的问题 .相反,我只是发现了另一个问题。在 Databricks 的笔记本中使用显示功能时,it shows the MapType column without duplicated keys .

最佳答案

由于您使用的是 Spark 2.4+,您可以尝试的一种方法是使用 Spark SQL aggregate功能,见下图:

aggregated = dataframe.groupby("h3",
   functions.window( 
     timeColumn="ts", 
     windowDuration="3 days", 
     slideDuration="1 day", 
   ) 
 ).agg( 
     functions.sort_array(functions.collect_list( 
       functions.struct("ts", "id", "feature") 
     ), False).alias("id_feature") 
 )   

我加了 ts字段到来自functions.collect_list的结构体数组。使用functions.sort_array按ts对列表进行排序按降序排列(如果存在重复,则保留最新记录)。在下面的聚合函数中,我们使用包含两个字段的named_struct 设置zero_value:ids (MapType) 缓存所有处理过的id 和total 仅在缓存中不存在新id 时进行求和ids .
aggregated.selectExpr("h3", "window", """
  aggregate(
    id_feature,
    /* zero_value */
    (map() as ids, 0L as total), 
    /* merge */
    (acc, y) -> named_struct(
      /* add y.id into the ids map */
      'ids', map_concat(acc.ids, map(y.id,1)), 
      /* sum to total only when y.id doesn't exist in acc.ids map */
      'total', acc.total + IF(acc.ids[y.id] is null,y.feature,0)
    ), 
    /* finish, take only acc.total, discard acc.ids map */
    acc -> acc.total
  ) as id_features

""").show()
+---+--------------------+----------+
| h3|              window|id_feature|
+---+--------------------+----------+
|  1|[2020-01-01 00:00...|         3|
|  1|[2019-12-31 00:00...|         3|
|  1|[2019-12-30 00:00...|         3|
|  1|[2020-01-02 00:00...|         2|
+---+--------------------+----------+

关于dataframe - 在 pyspark 中删除重复项时进行聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60608544/

相关文章:

python - 如何组合 pandas 数据框中的两列并为其设置值?

python - 将键/值对的 Pyspark RDD 解析为 .csv 格式

Scala Spark 将多列对分解为行

amazon-web-services - AWS Glue Python 作业未创建新的数据目录分区

python - Pyspark:在驱动程序和工作人员上使用 ffmpeg

r - 当两个数据框具有不同的列集时,按行合并两个数据框(rbind)

python - pandas:类型转换返回错误的值

R data.frame 因子而不是级别

hadoop - spark1.3无法从HDFS1读取数据

apache-spark - Spark 独立集群 - 从站未连接到主站