dataframe - 如何旋转 Spark DataFrame?

标签 dataframe apache-spark pyspark apache-spark-sql pivot

我开始使用 Spark DataFrames,我需要能够旋转数据以从具有多行的 1 列中创建多列。 Scalding 中有内置的功能,我相信 Python 中的 Pandas,但我找不到新的 Spark Dataframe 的任何内容。

我假设我可以编写某种自定义函数来执行此操作,但我什至不知道如何开始,特别是因为我是 Spark 新手。如果有人知道如何使用内置功能来做到这一点,或者知道如何在 Scala 中编写某些内容,我们将不胜感激。

最佳答案

As mentioned通过 David Anderson Spark提供pivot从1.6版本开始功能。一般语法如下:

df
  .groupBy(grouping_columns)
  .pivot(pivot_column, [values]) 
  .agg(aggregate_expressions)

使用 nycflights13 的使用示例和 csv 格式:

Python:

from pyspark.sql.functions import avg

flights = (sqlContext
    .read
    .format("csv")
    .options(inferSchema="true", header="true")
    .load("flights.csv")
    .na.drop())

flights.registerTempTable("flights")
sqlContext.cacheTable("flights")

gexprs = ("origin", "dest", "carrier")
aggexpr = avg("arr_delay")

flights.count()
## 336776

%timeit -n10 flights.groupBy(*gexprs ).pivot("hour").agg(aggexpr).count()
## 10 loops, best of 3: 1.03 s per loop

斯卡拉:

val flights = sqlContext
  .read
  .format("csv")
  .options(Map("inferSchema" -> "true", "header" -> "true"))
  .load("flights.csv")

flights
  .groupBy($"origin", $"dest", $"carrier")
  .pivot("hour")
  .agg(avg($"arr_delay"))

Java:

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.*;

Dataset<Row> df = spark.read().format("csv")
        .option("inferSchema", "true")
        .option("header", "true")
        .load("flights.csv");

df.groupBy(col("origin"), col("dest"), col("carrier"))
        .pivot("hour")
        .agg(avg(col("arr_delay")));

R/SparkR:

library(magrittr)

flights <- read.df("flights.csv", source="csv", header=TRUE, inferSchema=TRUE)

flights %>% 
  groupBy("origin", "dest", "carrier") %>% 
  pivot("hour") %>% 
  agg(avg(column("arr_delay")))

R/sparklyr

library(dplyr)

flights <- spark_read_csv(sc, "flights", "flights.csv")

avg.arr.delay <- function(gdf) {
   expr <- invoke_static(
      sc,
      "org.apache.spark.sql.functions",
      "avg",
      "arr_delay"
    )
    gdf %>% invoke("agg", expr, list())
}

flights %>% 
  sdf_pivot(origin + dest + carrier ~  hour, fun.aggregate=avg.arr.delay)

SQL:

请注意,从 2.4 版本开始支持 Spark SQL 中的 PIVOT 关键字。

CREATE TEMPORARY VIEW flights 
USING csv 
OPTIONS (header 'true', path 'flights.csv', inferSchema 'true') ;

 SELECT * FROM (
   SELECT origin, dest, carrier, arr_delay, hour FROM flights
 ) PIVOT (
   avg(arr_delay)
   FOR hour IN (0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12,
                13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23)
 );

示例数据:

"year","month","day","dep_time","sched_dep_time","dep_delay","arr_time","sched_arr_time","arr_delay","carrier","flight","tailnum","origin","dest","air_time","distance","hour","minute","time_hour"
2013,1,1,517,515,2,830,819,11,"UA",1545,"N14228","EWR","IAH",227,1400,5,15,2013-01-01 05:00:00
2013,1,1,533,529,4,850,830,20,"UA",1714,"N24211","LGA","IAH",227,1416,5,29,2013-01-01 05:00:00
2013,1,1,542,540,2,923,850,33,"AA",1141,"N619AA","JFK","MIA",160,1089,5,40,2013-01-01 05:00:00
2013,1,1,544,545,-1,1004,1022,-18,"B6",725,"N804JB","JFK","BQN",183,1576,5,45,2013-01-01 05:00:00
2013,1,1,554,600,-6,812,837,-25,"DL",461,"N668DN","LGA","ATL",116,762,6,0,2013-01-01 06:00:00
2013,1,1,554,558,-4,740,728,12,"UA",1696,"N39463","EWR","ORD",150,719,5,58,2013-01-01 05:00:00
2013,1,1,555,600,-5,913,854,19,"B6",507,"N516JB","EWR","FLL",158,1065,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,709,723,-14,"EV",5708,"N829AS","LGA","IAD",53,229,6,0,2013-01-01 06:00:00
2013,1,1,557,600,-3,838,846,-8,"B6",79,"N593JB","JFK","MCO",140,944,6,0,2013-01-01 06:00:00
2013,1,1,558,600,-2,753,745,8,"AA",301,"N3ALAA","LGA","ORD",138,733,6,0,2013-01-01 06:00:00

性能考虑因素:

一般来说,旋转是一项昂贵的操作。

相关问题:

关于dataframe - 如何旋转 Spark DataFrame?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30244910/

相关文章:

apache-spark - Mac Spark-shell 初始化 SparkContext 时出错

python - pandas_udf 错误 RuntimeError : Result vector from pandas_udf was not the required length: expected 12, 得到 35

scala - 如何将任何新库(如 spark-sftp)添加到我的 Pyspark 代码中?

apache-spark - 如何计算通过百分比并插入到 Spark 数据框中的列中?

python - 多索引数据帧的基于整数的 (K,N) 索引

python - 将 json 转换为 pandas DataFrame

python - 使用 Python 在 Pandas 中读取 CSV 文件时出现 UnicodeDecodeError

python - 计算 Pandas 中多个列的非空值百分比

python - 使用 pyspark 脚本从 bigquery 加载表到 spark 集群

pandas - 有没有办法强制 spark worker 使用分布式 numpy 版本而不是安装在他们身上的版本?