我想对窗口进行计数。聚合的计数结果应该存储在一个新的列中:
输入数据框:
val df = Seq(("N1", "M1","1"),("N1", "M1","2"),("N1", "M2","3")).toDF("NetworkID", "Station","value")
+---------+-------+-----+
|NetworkID|Station|value|
+---------+-------+-----+
| N1| M1| 1|
| N1| M1| 2|
| N1| M2| 3|
+---------+-------+-----+
val w = Window.partitionBy(df("NetworkID"))
目前我得到的结果:
df.withColumn("count", count("Station").over(w)).show()
+---------+-------+-----+-----+
|NetworkID|Station|value|count|
+---------+-------+-----+-----+
| N1| M2| 3| 3|
| N1| M1| 1| 3|
| N1| M1| 2| 3|
+---------+-------+-----+-----+
我想要的结果:
+---------+-------+-----+-----+
|NetworkID|Station|value|count|
+---------+-------+-----+-----+
| N1| M2| 3| 2|
| N1| M1| 1| 2|
| N1| M1| 2| 2|
+---------+-------+-----+-----+
因为 NetworkID N1 的站数等于 2(M1 和 M2)。
我知道我可以通过创建一个新的数据框来做到这一点,选择 2 列 NetworkID 和 Station 并执行 groupBy 并加入第一个。
但是我需要对数据框的不同列进行大量汇总计数,因此我必须避免连接。
提前致谢
最佳答案
您还需要在“Station”列上使用 partitionBy,因为您要为每个 NetworkID 计算 Stations。
scala> val df = Seq(("N1", "M1","1"),("N1", "M1","2"),("N1", "M2","3"),("N2", "M1", "4"), ("N2", "M2", "2")).toDF("NetworkID", "Station", "value")
df: org.apache.spark.sql.DataFrame = [NetworkID: string, Station: string ... 1 more field]
scala> val w = Window.partitionBy("NetworkID", "Station")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5b481d77
scala> df.withColumn("count", count("Station").over(w)).show()
+---------+-------+-----+-----+
|NetworkID|Station|value|count|
+---------+-------+-----+-----+
| N2| M2| 2| 1|
| N1| M2| 3| 1|
| N2| M1| 4| 1|
| N1| M1| 1| 2|
| N1| M1| 2| 2|
+---------+-------+-----+-----+
关于apache-spark - 按聚合计数进行窗口分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55265247/