python - 在 PySpark 中将 Spark DataFrame 从行转换为列,并将其附加到另一个 DataFrame

标签 python dataframe apache-spark pyspark transpose

我在 PySpark 中有一个 Spark DataFrame avg_length_df看起来像 -

+----------------+---------+----------+-----------+---------+-------------+----------+
|       id       |        x|         a|          b|        c|      country|     param|
+----------------+---------+----------+-----------+---------+-------------+----------+
|            40.0|      9.0|     5.284|      5.047|    6.405|         13.0|avg_length|
+----------------+---------+----------+-----------+---------+-------------+----------+

我想将它从行到列转置,以便它变成 -
+----------+
|avg_length|
+----------+
|      40.0|
|       9.0|
|     5.284|
|     5.047|
|     6.405|
|      13.0|
+----------+

接下来,我有第二个 DataFrame df2 :
+----------------+------+
|       col_names|dtypes|
+----------------+------+
|              id|string|
|               x|   int|
|               a|string|
|               b|string|
|               c|string|
|         country|string|
+----------------+------+

我想创建一个专栏 avg_lengthdf2等于上面转置的DataFrame。所以预期的输出看起来像:
+----------------+------+----------+
|       col_names|dtypes|avg_length|
+----------------+------+----------+
|              id|string|      40.0|
|               x|   int|       9.0|
|               a|string|     5.284|
|               b|string|     5.047|
|               c|string|     6.405|
|         country|string|      13.0|
+----------------+------+----------+

我如何完成这 2 个操作?

最佳答案

>>> from pyspark.sql import *
#Input DataFrame
>>> df.show()
+----+---+-----+-----+-----+-------+----------+
|  id|  x|    a|    b|    c|country|     param|
+----+---+-----+-----+-----+-------+----------+
|40.0|9.0|5.284|5.047|6.405|   13.0|avg_length|
+----+---+-----+-----+-----+-------+----------+

>>> avgDF  = df.groupBy(df["id"],df["x"],df["a"],df["b"],df["c"],df["country"]).pivot("param").agg(concat_ws("",collect_list(to_json(struct("id","x","a","b","c","country"))))).drop("id","x","a","b","c","country")
>>> avgDF.show(2,False)
+----------------------------------------------------------------------------+
|avg_length                                                                  |
+----------------------------------------------------------------------------+
|{"id":"40.0","x":"9.0","a":"5.284","b":"5.047","c":"6.405","country":"13.0"}|
+----------------------------------------------------------------------------+

>>> finalDF = avgDF.withColumn("value", explode(split(regexp_replace(col("avg_length"),"""[\\{ " \\}]""",""),","))).withColumn("avg_length", split(col("value"), ":")[1]).withColumn("col_names", split(col("value"), ":")[0]).drop("value")
>>> finalDF.show(10,False)
+----------+---------+
|avg_length|col_names|
+----------+---------+
|40.0      |id       |
|9.0       |x        |
|5.284     |a        |
|5.047     |b        |
|6.405     |c        |
|13.0      |country  |
+----------+---------+

#other dataframe
>>> df2.show()
+---------+------+
|col_names|dtypes|
+---------+------+
|       id|string|
|        x|   int|
|        a|string|
|        b|string|
|        c|string|
|  country|string|
+---------+------+

>>> df2.join(finalDF,"col_names").show(10,False)
+---------+------+----------+
|col_names|dtypes|avg_length|
+---------+------+----------+
|id       |string|40.0      |
|x        |int   |9.0       |
|a        |string|5.284     |
|b        |string|5.047     |
|c        |string|6.405     |
|country  |string|13.0      |
+---------+------+----------+

关于python - 在 PySpark 中将 Spark DataFrame 从行转换为列,并将其附加到另一个 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58529216/

相关文章:

apache-spark - Spark - 将 kafka 流式传输到每天都在变化的文件?

apache-spark - 如何在 macOS 上安装 Apache Spark 历史记录服务器

python - 二维数组中的 Numpy 垂直滚动

python - 如何在Python中将循环中的日期交换到数组中?

scala - 将jar提交到sequenceiq docker-spark容器

python - 如何仅在 Pandas 数据框的选定行和列上应用函数?

python - 如何使用 PySpark 在选择性列中插入数据?

python - 哪个 Python (sqlalchemy) mssql DB API 在 Cygwin 中工作?

python - SQLite、Python 和列表

python - 为什么循环中的 dict.pop() 会从 future 循环迭代中创建的字典中删除键?