scala - Spark 中多个 Pivoted 列的重命名和优化

标签 scala hadoop apache-spark pyspark

我的输入数据中有一组列,我根据多列对这些列进行透视。

旋转完成后我遇到列标题问题。

输入数据

enter image description here

我的方法生成的输出 -

enter image description here

预期的输出 header :

我需要输出的标题看起来像 -

enter image description here

到目前为止为实现我得到的输出所做的步骤 -

// *Load the data*

scala> val input_data =spark.read.option("header","true").option("inferschema","true").option("delimiter","\t").csv("s3://mybucket/data.tsv")

// *Filter the data where residentFlag column = T*

scala> val filtered_data = input_data.select("numericID","age","salary","gender","residentFlag").filter($"residentFlag".contains("T"))

// *Now we will the pivot the filtered data by each column*

scala> val pivotByAge = filtered_data.groupBy("age","numericID").pivot("age").agg(expr("coalesce(first(numericID),'-')")).drop("age")

// *Pivot the data by the second column named "salary"*

scala> val pivotBySalary = filtered_data.groupBy("salary","numericID").pivot("salary").agg(expr("coalesce(first(numericID),'-')")).drop("salary")

// *Join the above two dataframes based on the numericID*

scala> val intermediateDf = pivotByAge.join(pivotBySalary,"numericID")

// *Now pivot the filtered data on Step 2 on the third column named Gender*

scala> val pivotByGender = filtered_data.groupBy("gender","numericID").pivot("gender").agg(expr("coalesce(first(numericID),'-')")).drop("gender")

// *Join the above dataframe with the intermediateDf*

scala> val outputDF= pivotByGender.join(intermediateDf ,"numericID")

如何重命名旋转后生成的列?

我可以采用不同的方法来基于多列(将近 300 列)旋转数据集吗?

对于提高性能有什么优化/建议吗?

最佳答案

可以考虑使用foldLeft遍历 to-pivot 列的列表以连续创建 pivot 数据框,重命名生成的 pivot 列,然后是 cumulative join:

val data = Seq(
  (1, 30, 50000, "M"),
  (1, 25, 70000, "F"),
  (1, 40, 70000, "M"),
  (1, 30, 80000, "M"),
  (2, 30, 80000, "M"),
  (2, 40, 50000, "F"),
  (2, 25, 70000, "F")
).toDF("numericID", "age", "salary", "gender")

// Create list pivotCols which consists columns to pivot
val id = data.columns.head
val pivotCols = data.columns.filter(_ != "numericID")

// Create the first pivot dataframe from the first column in list pivotCols and
// rename each of the generated pivot columns
val c1 = pivotCols.head
val df1 = data.groupBy(c1, id).pivot(c1).agg(expr(s"coalesce(first($id),'-')")).drop(c1)
val df1Renamed = df1.columns.tail.foldLeft( df1 )( (acc, x) =>
      acc.withColumnRenamed(x, c1 + "_" + x)
    )

// Using the first pivot dataframe as the initial dataframe, process each of the
// remaining columns in list pivotCols similar to how the first column is processed,
// and cumulatively join each of them with the previously joined dataframe
pivotCols.tail.foldLeft( df1Renamed )(
  (accDF, c) => {
    val df = data.groupBy(c, id).pivot(c).agg(expr(s"coalesce(first($id),'-')")).drop(c)
    val dfRenamed = df.columns.tail.foldLeft( df )( (acc, x) =>
      acc.withColumnRenamed(x, c + "_" + x)
    )
    dfRenamed.join(accDF, Seq(id))
  }
)

// +---------+--------+--------+------------+------------+------------+------+------+------+
// |numericID|gender_F|gender_M|salary_50000|salary_70000|salary_80000|age_25|age_30|age_40|
// +---------+--------+--------+------------+------------+------------+------+------+------+
// |2        |2       |-       |2           |-           |-           |-     |2     |-     |
// |2        |2       |-       |2           |-           |-           |2     |-     |-     |
// |2        |2       |-       |2           |-           |-           |-     |-     |2     |
// |2        |2       |-       |-           |2           |-           |-     |2     |-     |
// |2        |2       |-       |-           |2           |-           |2     |-     |-     |
// |2        |2       |-       |-           |2           |-           |-     |-     |2     |
// |2        |2       |-       |-           |-           |2           |-     |2     |-     |
// |2        |2       |-       |-           |-           |2           |2     |-     |-     |
// |2        |2       |-       |-           |-           |2           |-     |-     |2     |
// |2        |-       |2       |2           |-           |-           |-     |2     |-     |
// |2        |-       |2       |2           |-           |-           |2     |-     |-     |
// |2        |-       |2       |2           |-           |-           |-     |-     |2     |
// |2        |-       |2       |-           |2           |-           |-     |2     |-     |
// |2        |-       |2       |-           |2           |-           |2     |-     |-     |
// |2        |-       |2       |-           |2           |-           |-     |-     |2     |
// |2        |-       |2       |-           |-           |2           |-     |2     |-     |
// |2        |-       |2       |-           |-           |2           |2     |-     |-     |
// |2        |-       |2       |-           |-           |2           |-     |-     |2     |
// |1        |-       |1       |-           |1           |-           |1     |-     |-     |
// |1        |-       |1       |-           |1           |-           |-     |-     |1     |
// ...

关于scala - Spark 中多个 Pivoted 列的重命名和优化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49284397/

相关文章:

scala - 函数组合、Kleisli 箭头和一元定律

java - 在 Scala 中解析具有 ISO 格式的日期时间

java - 如何在java中使用newAPIHadoopRDD?

python - Pyspark HiveContext.table 和 HiveContext.sql 性能

scala - 检查可观察对象是否完成的好方法是什么

hadoop - 在 Pig 中生成任意嵌套的元组?

java - eclipse 上的 Apache Spark

python - Apache Spark 用于较小数据集迭代模型拟合的用例

scala - 无法在 Spark Scala 中导入 org.apache.spark.streaming.twitter

java - 测试容器 : ignore parent `EXPOSE` instruction from Dockerfile