我有一个类似于以下的数据框:
val df = sc.parallelize(Seq((100, 1, 1), (100, 1,2), (100, 2,3), (200, 1,1), (200, 2,3), (200, 2, 2), (200, 3, 1), (200, 3,2), (300, 1,1), (300,1,2), (300, 2,5), (400, 1, 6))).toDF("_c0", "_c1", "_c2")
+---+---+--------------------+
|_c0|_c1| _c2|
+---+---+--------------------+
|100| 1|1 |
|100| 1|2 |
|100| 2|3 |
|200| 1|1 |
|200| 2|3 |
|200| 2|2 |
|200| 3|1 |
|200| 3|2 |
|300| 1|1 |
|300| 1|2 |
|300| 2|5 |
|400| 1|6 |
我需要对 _c0 和 _c1 进行 groupBy 并获取一些像这样的 rdd:
res9: Array[Array[Array[Int]]] = Array(Array(Array(1, 2), Array(3)), Array(Array(1), Array(3, 2), Array(1, 2)), Array(Array(1, 2), Array(5)), Array(Array(6)))
它是一个数组的数组,我是scala的新手。请尝试提供帮助
最佳答案
您可以先将 groupBy
_c0
和 _c1
一起使用,然后仅使用 groupBy
_c1
> 以获得您想要的结果。下面是相同的代码。
//first group by "_c0" and "_c1"
val res = df.groupBy("_c0", "_c1").agg(collect_list("_c2").as("_c2"))
//group by "_c0"
.groupBy("_c0").agg(collect_list("_c2").as("_c2"))
.select("_c2")
res.show(false)
//output
//+---------------------------------------------------------+
//|_c2 |
//+---------------------------------------------------------+
//|[WrappedArray(1, 2), WrappedArray(5)] |
//|[WrappedArray(1, 2), WrappedArray(3)] |
//|[WrappedArray(6)] |
//|[WrappedArray(3, 2), WrappedArray(1, 2), WrappedArray(1)]|
//+---------------------------------------------------------+
要将其转换为 RDD
,请使用 .rdd
来生成 dataframe
。
import scala.collection.mutable.WrappedArray
val rdd = res.rdd.map(x => x.get(0)
.asInstanceOf[WrappedArray[WrappedArray[Int]]].array.map(x => x.toArray))
//to get the content or rdd(Don't use it if data is too big)
rdd.collect()
//output
//Array(Array(Array(1, 2), Array(5)), Array(Array(1, 2), Array(3)), Array(Array(6)), Array(Array(3, 2), Array(1, 2), Array(1)))
关于arrays - scala 中的 groupBy 多个键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50906189/