我有两个 DataFrame
a
和 b
。
a
就像
Column 1 | Column 2
abc | 123
cde | 23
b
就像
Column 1
1
2
我想压缩 a
和 b
(甚至更多)DataFrames,它会变成这样:
Column 1 | Column 2 | Column 3
abc | 123 | 1
cde | 23 | 2
我该怎么做?
最佳答案
DataFrame API 不支持这样的操作。可以zip
两个 RDD,但要使其工作,您必须匹配分区数量和每个分区的元素数量。假设情况是这样:
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, LongType}
val a: DataFrame = sc.parallelize(Seq(
("abc", 123), ("cde", 23))).toDF("column_1", "column_2")
val b: DataFrame = sc.parallelize(Seq(Tuple1(1), Tuple1(2))).toDF("column_3")
// Merge rows
val rows = a.rdd.zip(b.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
// Merge schemas
val schema = StructType(a.schema.fields ++ b.schema.fields)
// Create new data frame
val ab: DataFrame = sqlContext.createDataFrame(rows, schema)
如果不满足上述条件,唯一想到的选择是添加索引和联接:
def addIndex(df: DataFrame) = sqlContext.createDataFrame(
// Add index
df.rdd.zipWithIndex.map{case (r, i) => Row.fromSeq(r.toSeq :+ i)},
// Create schema
StructType(df.schema.fields :+ StructField("_index", LongType, false))
)
// Add indices
val aWithIndex = addIndex(a)
val bWithIndex = addIndex(b)
// Join and clean
val ab = aWithIndex
.join(bWithIndex, Seq("_index"))
.drop("_index")
关于scala - 如何在 Spark 中压缩两个(或更多)DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32882529/