apache-spark - 按聚合计数进行窗口分区

标签 apache-spark apache-spark-sql

我想对窗口进行计数。聚合的计数结果应该存储在一个新的列中:

输入数据框:

    val df = Seq(("N1", "M1","1"),("N1", "M1","2"),("N1", "M2","3")).toDF("NetworkID", "Station","value")

    +---------+-------+-----+
    |NetworkID|Station|value|
    +---------+-------+-----+
    |       N1|     M1|    1|
    |       N1|     M1|    2|
    |       N1|     M2|    3|
    +---------+-------+-----+

    val w = Window.partitionBy(df("NetworkID"))

目前我得到的结果:

        df.withColumn("count", count("Station").over(w)).show()
        +---------+-------+-----+-----+
        |NetworkID|Station|value|count|
        +---------+-------+-----+-----+
        |       N1|     M2|    3|    3|
        |       N1|     M1|    1|    3|
        |       N1|     M1|    2|    3|
        +---------+-------+-----+-----+

我想要的结果:

+---------+-------+-----+-----+

|NetworkID|Station|value|count|

+---------+-------+-----+-----+

|       N1|     M2|    3|    2|

|       N1|     M1|    1|    2|

|       N1|     M1|    2|    2|

+---------+-------+-----+-----+

因为 NetworkID N1 的站数等于 2(M1 和 M2)。

我知道我可以通过创建一个新的数据框来做到这一点,选择 2 列 NetworkID 和 Station 并执行 groupBy 并加入第一个。

但是我需要对数据框的不同列进行大量汇总计数,因此我必须避免连接。

提前致谢

最佳答案

您还需要在“Station”列上使用 partitionBy,因为您要为每个 NetworkID 计算 Stations。

scala> val df = Seq(("N1", "M1","1"),("N1", "M1","2"),("N1", "M2","3"),("N2", "M1", "4"), ("N2", "M2", "2")).toDF("NetworkID", "Station", "value")
df: org.apache.spark.sql.DataFrame = [NetworkID: string, Station: string ... 1 more field]

scala> val w = Window.partitionBy("NetworkID", "Station")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@5b481d77

scala> df.withColumn("count", count("Station").over(w)).show()
+---------+-------+-----+-----+
|NetworkID|Station|value|count|
+---------+-------+-----+-----+
|       N2|     M2|    2|    1|
|       N1|     M2|    3|    1|
|       N2|     M1|    4|    1|
|       N1|     M1|    1|    2|
|       N1|     M1|    2|    2|
+---------+-------+-----+-----+

关于apache-spark - 按聚合计数进行窗口分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55265247/

相关文章:

scala - 在 Spark/Scala 中保持本地、测试和生产配置属性的最佳实践

apache-spark - 请求之间 "cache"Spark 数据集的最佳方法是什么?

scala - 根据一个 RDD 中的键过滤另一个 RDD

scala - Apache Spark Scala - Hive 插入到抛出 "too large frame error"

python - 通过将逗号分隔的列的值替换为基于另一个数据框的查找来创建新列

apache-spark - 如何显示已排序的 Dataframe 列名称?

scala - 对数据框中的列(PANCARD)值进行排序

python - 如何有效地为数据框的列名称添加前缀,而无需在 Pyspark 中创建新的数据框?

database - 用 Spark Dataframe 中另一个分类列的平均值替换列的空值

python - PySpark:将 PythonRDD 附加/合并到 PySpark 数据框