scala - 如何从分组数据中获取 Spark 数据帧

标签 scala apache-spark

我有一个数据框,我想按列分组并将这些组转回具有相同架构的数据框。原因是我想跨组映射一个带有签名 DataFrame -> String 的函数。这是我正在尝试的:

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF
val schema = df.schema
val groups = df.rdd.groupBy(x => x(0))
               .mapValues(g => sqlContext.createDataFrame(sc.makeRDD(g.toList), schema))
               .take(1)

这是我希望的:

scala> groups(0)._2.collect
Array[org.apache.spark.sql.Row] = Array([1,2,3], [1,2,4])    

但它不起作用(任务因 NullPointerException 而失败)...我猜你不能映射一个引用 spark 上下文的函数,但我不确定如何实现此目的?

最佳答案

I guess you cant map a function that refers to the spark context

正确 - 您不能在传递给任何 Spark 高阶函数的函数中使用任何 Spark 上下文对象(或 RDD 或数据帧),因为这需要序列化这些对象和将它们发送给执行程序,但它们是有意不可序列化的,因为这没有意义(每个执行程序都必须像另一个驱动程序应用程序一样运行)。

要实现只包含一个“组”的 Dataframe,我建议使用 filter 而不是 groupBy:您可以先收集所有组键,然后将每个键映射到过滤后的 Dataframe:

val df = sc.parallelize(Seq((1,2,3),(1,2,4),(2,3,4))).toDF

df.cache() // EDIT: this might speed this up significantly, as DF will be reused instead of recalculated for each key 

val groupKeys: Array[Int] = df.map { case Row(i: Int, _, _) => i }.distinct().collect()
val dfPerKey: Array[DataFrame] = groupKeys.map(k => df.filter($"_1" === k))

dfPerKey.foreach(_.show())
// prints:
//    +---+---+---+
//    | _1| _2| _3|
//    +---+---+---+
//    |  1|  2|  3|
//    |  1|  2|  4|
//    +---+---+---+
//
//    +---+---+---+
//    | _1| _2| _3|
//    +---+---+---+
//    |  2|  3|  4|
//    +---+---+---+

关于scala - 如何从分组数据中获取 Spark 数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39177362/

相关文章:

postgresql - Spark Dataframes UPSERT 到 Postgres 表

arrays - Scala 二维数组按主列和辅助列排序

scala - 可序列化对象的用法 : Caused by: java. io.NotSerializedException

apache-spark - 过滤并保存数据帧的前 X 行

python-3.x - 如何在不使用Pyspark中的collect()方法的情况下将pyspark.rdd.PipelinedRDD转换为数据框?

amazon-ec2 - Spark - AWS EMR 集群首选哪种实例类型?

scala - 奇怪的并行收集行为

scala - 从 Scala 中的另一个类访问变量

apache-spark - 忽略了 JSON 阅读器中的 Spark 采样选项?

scala - 带有 “No TypeTag available”的Scala/Spark应用程序 “def main”风格的应用程序出错