python - Pyspark Dataframe 上的 Pivot String 列

标签 python apache-spark dataframe pyspark apache-spark-sql

我有一个像这样的简单数据框:

rdd = sc.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

df_data.show()
 +---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

我需要按日期调整它:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|2321.0|  null|  null|
|  0|   A| 422.0|  22.0| 223.0|
|  1|   B|3213.0|3213.0|  null|
+---+----+------+------+------+

一切都按预期进行。但现在我需要旋转它并获得一个非数字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()

当然我会得到一个异常(exception):

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

我想在行上生成一些东西

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|DOCK  |  null|  null|
|  0|   A| DOCK |  PORT| DOCK|
|  1|   B|DOCK  |PORT  |  null|
+---+----+------+------+------+

pivot 可以做到吗?

最佳答案

假设 (id |type | date) 组合是唯一的,并且您的唯一目标是旋转而不是聚合,您可以使用 first (或任何其他不限于数值):

from pyspark.sql.functions import first

(df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(first("ship"))
    .show())

## +---+----+------+------+------+
## | id|type|201601|201602|201603|
## +---+----+------+------+------+
## |  2|   C|  DOCK|  null|  null|
## |  0|   A|  DOCK|  PORT|  PORT|
## |  1|   B|  PORT|  DOCK|  null|
## +---+----+------+------+------+

如果这些假设不正确,您必须预先汇总数据。例如对于最常见的 ship 值:

from pyspark.sql.functions import max, struct

(df_data
    .groupby("id", "type", "date", "ship")
    .count()
    .groupby("id", "type")
    .pivot("date")
    .agg(max(struct("count", "ship")))
    .show())

## +---+----+--------+--------+--------+
## | id|type|  201601|  201602|  201603|
## +---+----+--------+--------+--------+
## |  2|   C|[1,DOCK]|    null|    null|
## |  0|   A|[1,DOCK]|[1,PORT]|[1,PORT]|
## |  1|   B|[1,PORT]|[1,DOCK]|    null|
## +---+----+--------+--------+--------+

关于python - Pyspark Dataframe 上的 Pivot String 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37486910/

相关文章:

python - scipys ndimage 过滤器的 "reflect"模式究竟是如何工作的?

python - 更改新目录和文件的权限模式

python - 搜索 3d 点以查找是否有连续的点

python - 如果在数据帧列中找到则返回字符串的关键字

python - 单张 : Add the plot legend and select initial zoom

scala - 使用 Spark 中的动态列将 RDD 数据写入 CSV - Scala

apache-spark - 是否有任何带有重复顶点合并功能的 Spark GraphX 构造函数

python - 如何重新格式化 Spark Python 输出

r - 如何创建一个带有字符串和每个重复对应数字的向量

python - 使用 python pandas 对大型 csv 文件的汇总统计