python - 如何在 PySpark 中分别对多个列进行透视

标签 python apache-spark pyspark pivot multiple-columns

是否有可能在 PySpark 中一次为不同的列创建一个枢轴? 我有一个这样的数据框:

from pyspark.sql import functions as sf
import pandas as pd
sdf = spark.createDataFrame(
    pd.DataFrame([[1, 'str1', 'str4'], [1, 'str1', 'str4'], [1, 'str2', 'str4'], [1, 'str2', 'str5'],
        [1, 'str3', 'str5'], [2, 'str2', 'str4'], [2, 'str2', 'str4'], [2, 'str3', 'str4'],
        [2, 'str3', 'str5']], columns=['id', 'col1', 'col2'])
)
# +----+------+------+
# | id | col1 | col2 |
# +----+------+------+
# |  1 | str1 | str4 |
# |  1 | str1 | str4 |
# |  1 | str2 | str4 |
# |  1 | str2 | str5 |
# |  1 | str3 | str5 |
# |  2 | str2 | str4 |
# |  2 | str2 | str4 |
# |  2 | str3 | str4 |
# |  2 | str3 | str5 |
# +----+------+------+

我想在多个列(“col1”、“col2”、...)上进行旋转,以获得如下所示的数据框:

+----+-----------+-----------+-----------+-----------+-----------+
| id | col1_str1 | col1_str2 | col1_str3 | col2_str4 | col2_str5 |
+----+-----------+-----------+-----------+-----------+-----------+
|  1 |         2 |         2 |         1 |         3 |         3 |
|  2 |         0 |         2 |         2 |         3 |         1 |
+----+-----------+-----------+-----------+-----------+-----------+

我找到了一个有效的解决方案:

sdf_pivot_col1 = (
    sdf
    .groupby('id')
    .pivot('col1')
    .agg(sf.count('id'))
)
sdf_pivot_col2 = (
    sdf
    .groupby('id')
    .pivot('col2')
    .agg(sf.count('id'))
)

sdf_result = (
    sdf
    .select('id').distinct()
    .join(sdf_pivot_col1, on = 'id' , how = 'left')
    .join(sdf_pivot_col2, on = 'id' , how = 'left')
).show()

# +---+----+----+----+----+----+
# | id|str1|str2|str3|str4|str5|
# +---+----+----+----+----+----+
# |  1|   2|   2|   1|   3|   2|
# |  2|null|   2|   2|   3|   1|
# +---+----+----+----+----+----+

但我正在寻找更紧凑的解决方案。

最佳答案

通过@mrjoseph 的链接,我想出了以下解决方案: 它有效,更干净,但我仍然不喜欢连接...

def pivot_udf(df, *cols):
    mydf = df.select('id').drop_duplicates()
    for c in cols:
        mydf = mydf.join(
            df
            .withColumn('combcol',sf.concat(sf.lit('{}_'.format(c)),df[c]))
            .groupby('id.pivot('combcol.agg(sf.count(c)),
            how = 'left', 
            on = 'id'
        )
    return mydf

pivot_udf(sdf, 'col1','col2').show()

+---+---------+---------+---------+---------+---------+
| id|col1_str1|col1_str2|col1_str3|col2_str4|col2_str5|
+---+---------+---------+---------+---------+---------+
|  1|        2|        2|        1|        3|        2|
|  2|     null|        2|        2|        3|        1|
+---+---------+---------+---------+---------+---------+

关于python - 如何在 PySpark 中分别对多个列进行透视,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57145661/

相关文章:

python - UndefinedMetricWarning : No positive samples in y_true, 真正的正值应该是无意义的 UndefinedMetricWarning)

java - Python 是否可以替代在 Java EE 中完成的 Web 应用程序?

python - Py4JJavaError : An error occurred while calling

apache-spark - Spark ;检查元素是否在 collect_list 中

javascript - JSONify 返回奇怪的值

python - 当数据框和元组值匹配时,有条件地从元组值填充数据框行

python - pyspark:根据另一个 RDD 的某些列过滤一个 RDD

apache-spark - 在 pyspark 数据框中复制一列

scala - Spark - 从具有嵌套文件夹的目录中获取特定数据类型的所有文件名

pandas - 尝试为在 Amazon EMR 上运行的 Pyspark 安装 Pandas