apache-spark - pyspark dataframe使用组来获取多个字段计数

标签 apache-spark pyspark apache-spark-sql

我有如下所示的按区域划分的客户数据源

region,source,consumer_id
APAC,mail,1
APAC,referral,2
APAC,mail,3
APAC,referral,5
APAC,mail,6
APAC,referral,7
APAC,referral,8
US East,mail,9
US East,referral,10
US East,walkIn,11
AUS,walkIn,12
AUS,referral,13

有人可以帮助使用 pyspark 数据帧获取区域明智的源计数,如下所示。

region,mail_source_cnt, referral_source_cnt, walkIn_source_cnt
APAC,3,4,0
US EAST,1,1,1
AUS,0,1,1

感谢帮助

最佳答案

您可以聚合以获取计数并将列旋转为,

>>> from pyspark.sql import functions as F
>>> df.show()
+-------+--------+-----------+
| region|  source|consumer_id|
+-------+--------+-----------+
|   APAC|    mail|          1|
|   APAC|referral|          2|
|   APAC|    mail|          3|
|   APAC|referral|          5|
|   APAC|    mail|          6|
|   APAC|referral|          7|
|   APAC|referral|          8|
|US East|    mail|          9|
|US East|referral|         10|
|US East|  walkIn|         11|
|    AUS|  walkIn|         12|
|    AUS|referral|         13|
+-------+--------+-----------+
>>> df1 = df.groupby('region','source').count()
>>> df1.show()
+-------+--------+-----+
| region|  source|count|
+-------+--------+-----+
|US East|  walkIn|    1|
|    AUS|  walkIn|    1|
|   APAC|    mail|    3|
|   APAC|referral|    4|
|US East|    mail|    1|
|US East|referral|    1|
|    AUS|referral|    1|
+-------+--------+-----+
>>> df2 = df1.withColumn('ccol',F.concat(df1['source'],F.lit('_cnt'))).groupby('region').pivot('ccol').agg(F.first('count')).fillna(0)
>>> df2.show()
+-------+--------+------------+----------+
| region|mail_cnt|referral_cnt|walkIn_cnt|
+-------+--------+------------+----------+
|    AUS|       0|           1|         1|
|   APAC|       3|           4|         0|
|US East|       1|           1|         1|
+-------+--------+------------+----------+

关于apache-spark - pyspark dataframe使用组来获取多个字段计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49671240/

相关文章:

python - 分析异常: Using PythonUDF in join condition of join type LeftSemi is not supported

python - python (4.2) python 2.7.14 : ensure that workers are registered and have sufficient resources

apache-spark - SPARK - 为什么 Spark 作业持续时间不等于每个阶段持续时间的总和?

java - 如何在 Spark sql 中舍入值

scala - 使用 Scala 在不同数据帧的列之间进行计算,其中包含类似 for 循环的内容

scala - 如何从 Scala 的 Glue Job 中的 S3 文件创建动态数据框?

python - Pyspark 依赖任何 ID 的任何滑动窗口

apache-spark - 如何从自定义类 Person 创建数据集?

scala - DataFrame API 如何依赖 Spark 中的 RDD?

python - 在从其他列(Spark)派生的数据框中添加新列