我有一个数据框,我想按列分组并将这些组转回具有相同架构的数据框。原因是我想跨组映射一个带有签名 DataFrame -> String
的函数。这是我正在尝试的:
val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF
val schema = df.schema
val groups = df.rdd.groupBy(x => x(0))
.mapValues(g => sqlContext.createDataFrame(sc.makeRDD(g.toList), schema))
.take(1)
这是我希望的:
scala> groups(0)._2.collect
Array[org.apache.spark.sql.Row] = Array([1,2,3], [1,2,4])
但它不起作用(任务因 NullPointerException
而失败)...我猜你不能映射一个引用 spark 上下文的函数,但我不确定如何实现此目的?
最佳答案
I guess you cant map a function that refers to the spark context
正确 - 您不能在传递给任何 Spark 高阶函数的函数中使用任何 Spark 上下文对象(或 RDD 或数据帧),因为这需要序列化这些对象和将它们发送给执行程序,但它们是有意不可序列化的,因为这没有意义(每个执行程序都必须像另一个驱动程序应用程序一样运行)。
要实现只包含一个“组”的 Dataframe,我建议使用 filter
而不是 groupBy
:您可以先收集
所有组键,然后将每个键映射到过滤后的 Dataframe:
val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF
df.cache() // EDIT: this might speed this up significantly, as DF will be reused instead of recalculated for each key
val groupKeys: Array[Int] = df.map { case Row(i: Int, _, _) => i }.distinct().collect()
val dfPerKey: Array[DataFrame] = groupKeys.map(k => df.filter($"_1" === k))
dfPerKey.foreach(_.show())
// prints:
// +---+---+---+
// | _1| _2| _3|
// +---+---+---+
// | 1| 2| 3|
// | 1| 2| 4|
// +---+---+---+
//
// +---+---+---+
// | _1| _2| _3|
// +---+---+---+
// | 2| 3| 4|
// +---+---+---+
关于scala - 如何从分组数据中获取 Spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39177362/