scala - 如何在 Spark 中压缩两个(或更多)DataFrame

标签 scala apache-spark dataframe apache-spark-sql

我有两个 DataFrame aba 就像

Column 1 | Column 2
abc      |  123
cde      |  23 

b 就像

Column 1 
1      
2      

我想压缩 ab (甚至更多)DataFrames,它会变成这样:

Column 1 | Column 2 | Column 3
abc      |  123     |   1
cde      |  23      |   2

我该怎么做?

最佳答案

DataFrame API 不支持这样的操作。可以zip两个 RDD,但要使其工作,您必须匹配分区数量和每个分区的元素数量。假设情况是这样:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}

val a: DataFrame = sc.parallelize(Seq(
  ("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")

// Merge rows
val rows = a.rdd.zip(b.rdd).map{
  case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}

// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)

// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)

如果不满足上述条件,唯一想到的选择是添加索引和联接:

def addIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add index
  df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
  // Create schema
  StructType(df.schema.fields :+ StructField("_index", LongType, false))
)

// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)

// Join and clean
val ab = aWithIndex
  .join(bWithIndex, Seq("_index"))
  .drop("_index")

关于scala - 如何在 Spark 中压缩两个(或更多)DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32882529/

相关文章:

scala - 如何在Scala中从一组String中产生一组Char

scala - 从 Idris 到 Scala 的通用加法器?

python - 在 PySpark ML 中创建自定义 Transformer

java - 从插入的 DataSet 获取主键以链接到其他插入

python - 在 Pandas 数据框中将 yes/no 转换为整数类型 1/0(不仅仅是替换)

python - 使用 InterX 函数转换为 python 时出现模块错误

scala - 阿卡 Actor : ask pattern vs Promise

http - 玩框架 WS 设置 cookie

python - PySpark:when子句中的多个条件

python - 在 Pandas 数据框列中存储不同值的最佳方式?