python - 在 Spark RDD 和/或 Spark DataFrames 中 reshape /透视数据

标签 python apache-spark pyspark apache-spark-sql pivot

我有一些以下格式的数据(RDD 或 Spark DataFrame):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

 rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])

# convert to a Spark DataFrame                    
schema = StructType([StructField('ID', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(rdd, schema)

我想做的是“ reshape ”数据,将 Country(特别是 US、UK 和 CA)中的某些行转换为列:

ID    Age  US  UK  CA  
'X01'  41  3   1   2  
'X02'  72  4   6   7   

基本上,我需要类似于 Python 的 pivot 工作流程的东西:

categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', 
                                                  columns = 'Country',
                                                  values = 'Score')

我的数据集相当大,所以我不能真正 collect() 并将数据摄取到内存中以在 Python 本身中进行 reshape 。有没有办法在映射 RDD 或 Spark DataFrame 时将 Python 的 .pivot() 转换为可调用函数?任何帮助将不胜感激!

最佳答案

从 Spark 1.6 开始,您可以使用 pivot GroupedData 上的函数并提供聚合表达式。

pivoted = (df
    .groupBy("ID", "Age")
    .pivot(
        "Country",
        ['US', 'UK', 'CA'])  # Optional list of levels
    .sum("Score"))  # alternatively you can use .agg(expr))
pivoted.show()

## +---+---+---+---+---+
## | ID|Age| US| UK| CA|
## +---+---+---+---+---+
## |X01| 41|  3|  1|  2|
## |X02| 72|  4|  6|  7|
## +---+---+---+---+---+

级别可以省略,但如果提供,既可以提高性能,又可以用作内部过滤器。

这种方法仍然比较慢,但肯定优于在 JVM 和 Python 之间手动传递数据。

关于python - 在 Spark RDD 和/或 Spark DataFrames 中 reshape /透视数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30260015/

相关文章:

python - 为什么部分在 View 之外的矩形被绘制为三角形?

python - 齐柏林飞艇 : Scala Dataframe to python

java - SQL错误:java.io.IOException:java.lang.IllegalArgumentException:bucketId超出范围:-1

csv - Spark 2.0读取csv分区数量(PySpark)

python - 如何在 PySpark 中进行嵌套转换

Python 看门狗窗口等待复制完成

python - Keras 自动编码器 : Tying Weights from Encoder To Decoder not working

python - 如何定义周期函数?

scala - 尝试在 Windows 中使用 sc.textFile 加载文件时出错

python - 从数据框中获取列总和,包括 map 列 - PySpark