apache-spark - 在pySpark中使用数字和分类值对两列进行透视

标签 apache-spark pyspark apache-spark-sql jupyter-notebook pivot

我在 pyspark 中有一个这样的数据集:
从集合导入namedtuple

    user_row = namedtuple('user_row', 'id time category value'.split())
    data = [
        user_row(1,1,'speed','50'),
        user_row(1,1,'speed','60'),
        user_row(1,2,'door', 'open'),
        user_row(1,2,'door','open'),
        user_row(1,2,'door','close'),
        user_row(1,2,'speed','75'),
        user_row(2,10,'speed','30'), 
        user_row(2,11,'door', 'open'),
        user_row(2,12,'door','open'),
        user_row(2,13,'speed','50'),
        user_row(2,13,'speed','40')
    ]
    
    user_df = spark.createDataFrame(data)
    user_df.show()
+---+----+--------+-----+
| id|time|category|value|
+---+----+--------+-----+
|  1|   1|   speed|   50|
|  1|   1|   speed|   60|
|  1|   2|    door| open|
|  1|   2|    door| open|
|  1|   2|    door|close|
|  1|   2|   speed|   75|
|  2|  10|   speed|   30|
|  2|  11|    door| open|
|  2|  12|    door| open|
|  2|  13|   speed|   50|
|  2|  13|   speed|   40|
+---+----+--------+-----+
我想要得到的是类似于下面的内容,其中按 id 和时间分组并以类别为中心,如果它是数字,则返回平均值,如果它是分类的,则返回模式。
+---+----+--------+-----+
| id|time|    door|speed|
+---+----+--------+-----+
|  1|   1|    null|   55|
|  1|   2|    open|   75|
|  2|  10|    null|   30|
|  2|  11|    open| null|
|  2|  12|    open| null|
|  2|  13|    null|   45|
+---+----+--------+-----+
我试过了,但对于分类值,它返回空值(我不担心速度列中的空值)
    df = user_df\
    .groupBy('id','time')\
    .pivot('category')\
    .agg(avg('value'))\
    .orderBy(['id', 'time'])\
    
    df.show()

+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|null| 75.0|
|  2|  10|null| 30.0|
|  2|  11|null| null|
|  2|  12|null| null|
|  2|  13|null| 45.0|
+---+----+----+-----+

最佳答案

你可以做一个额外的枢轴和合并它们。尝试这个。

import pyspark.sql.functions as F
from collections import namedtuple

user_row = namedtuple('user_row', 'id time category value'.split())
data = [
    user_row(1,1,'speed','50'),
    user_row(1,1,'speed','60'),
    user_row(1,2,'door', 'open'),
    user_row(1,2,'door','open'),
    user_row(1,2,'door','close'),
    user_row(1,2,'speed','75'),
    user_row(2,10,'speed','30'), 
    user_row(2,11,'door', 'open'),
    user_row(2,12,'door','open'),
    user_row(2,13,'speed','50'),
    user_row(2,13,'speed','40')
]

user_df = spark.createDataFrame(data)
#%%
#user_df.show()
df = user_df.groupBy('id','time')\
            .pivot('category')\
            .agg(F.avg('value').alias('avg'),F.max('value').alias('max'))\
#%%
expr1= [x for x in df.columns if '_avg' in x]
expr2= [x for x in df.columns if 'max' in x]
expr=zip(expr1,expr2)
#%%
sel_expr= [F.coalesce(x[0],x[1]).alias(x[0].split('_')[0]) for x in expr]
#%%
    
df_final = df.select('id','time',*sel_expr).orderBy('id','time')

df_final.show()
+---+----+----+-----+
| id|time|door|speed|
+---+----+----+-----+
|  1|   1|null| 55.0|
|  1|   2|open| 75.0|
|  2|  10|null| 30.0|
|  2|  11|open| null|
|  2|  12|open| null|
|  2|  13|null| 45.0|
+---+----+----+-----+

关于apache-spark - 在pySpark中使用数字和分类值对两列进行透视,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63061806/

相关文章:

apache-spark - Apache Spark 流与 Spring XD Streams

java - Spark 1.6 DirectFileOutputCommitter

azure - dfR = Spark.read.format ("csv").option ("mode", "FAILFAST").option ("header","true").schema(sch).load(fileName) ---- 不工作

pyspark - 如何通过pyspark将csv文件写入一个文件

python - 在 pyspark 数据框中显示不同的列值

oracle - 如何在 Spark 中使用 Hadoop Credential provider 连接到 Oracle 数据库?

scala - Spark 驱动程序被 master 解除关联并删除

google-cloud-platform - 在 dataproc 上使用 PEX 环境打包 PySpark

java - 如何使用 java api 在 Apache Spark 数据集中按 desc 排序?

apache-spark - UDF 将单词映射到 Spark 中的术语索引