pyspark - 如何在pyspark中的groupBy之后获得每个计数的总数百分比?

标签 pyspark

鉴于以下数据帧:

import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test").getOrCreate()
df = spark.createDataFrame([['a',1],['b', 2],['a', 3]], ['category', 'value'])
df.show()


+--------+-----+
|category|value|
+--------+-----+
|       a|    1|
|       b|    2|
|       a|    3|
+--------+-----+

我想计算每个类别中的项目数量并为每个计数提供一个百分比,就像这样
+--------+-----+----------+
|category|count|percentage|
+--------+-----+----------+
|       b|    1|     0.333|
|       a|    2|     0.667|
+--------+-----+----------+

最佳答案

您可以通过以下方式获得总数的计数和百分比/比率

import pyspark.sql.functions as f
from pyspark.sql.window import Window
df.groupBy('category').count()\
  .withColumn('percentage', f.round(f.col('count') / f.sum('count')\
  .over(Window.partitionBy()),3)).show()

+--------+-----+----------+
|category|count|percentage|
+--------+-----+----------+
|       b|    1|     0.333|
|       a|    2|     0.667|
+--------+-----+----------+

前面的语句可以分为几个步骤。 df.groupBy('category').count()产生 count :
+--------+-----+
|category|count|
+--------+-----+
|       b|    1|
|       a|    2|
+--------+-----+

然后通过应用窗口函数,我们可以获得每行的总数:
df.groupBy('category').count().withColumn('total', f.sum('count').over(Window.partitionBy())).show()

+--------+-----+-----+
|category|count|total|
+--------+-----+-----+
|       b|    1|    3|
|       a|    2|    3|
+--------+-----+-----+

哪里total column 是通过将分区(包括所有行的单个分区)中的所有计数加在一起来计算的。

一旦我们有了 counttotal对于每一行,我们可以计算比率:
df.groupBy('category')\
  .count()\
  .withColumn('total', f.sum('count').over(Window.partitionBy()))\
  .withColumn('percentage',f.col('count')/f.col('total'))\
  .show()

+--------+-----+-----+------------------+
|category|count|total|        percentage|
+--------+-----+-----+------------------+
|       b|    1|    3|0.3333333333333333|
|       a|    2|    3|0.6666666666666666|
+--------+-----+-----+------------------+

关于pyspark - 如何在pyspark中的groupBy之后获得每个计数的总数百分比?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52264844/

相关文章:

json - Pyspark:从涉及数组列的 Json 架构创建架构

Pyspark - 如何从 DataFrame 列中获取随机值

apache-spark - 按计数对collect_set进行排序

hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

python-3.x - 扁平化pyspark中的Maptype列

pandas - PySpark 数据帧 Pandas UDF 返回空数据帧

json - pyspark 将新的嵌套数组添加到现有的 json 文件中

python - 如何在 Spark 中使用 Dataframes 的相关性?

python - pyspark 状态报告之间的时间

amazon-web-services - AWS Glue - 是否使用爬网程序