我有一个使用 rdd 的请求:
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
结果是:(New York,List(Jack))
(Detroit,List(Michael, Peter, George))
(Los Angeles,List(Tom))
(Houston,List(John))
(Chicago,List(David, Andrew))
如何在 spark2.0 中使用数据集?
我有自定义函数的使用方法,但是感觉好复杂,有没有简单点的方法?
最佳答案
我建议您从创建 case class
开始作为
case class Monkey(city: String, firstName: String)
此
case class
应该在主类之外定义。然后你就可以使用 toDS
功能及用途groupBy
和 aggregation
函数调用 collect_list
如下import sqlContext.implicits._
import org.apache.spark.sql.functions._
val test = Seq(("New York", "Jack"),
("Los Angeles", "Tom"),
("Chicago", "David"),
("Houston", "John"),
("Detroit", "Michael"),
("Chicago", "Andrew"),
("Detroit", "Peter"),
("Detroit", "George")
)
sc.parallelize(test)
.map(row => Monkey(row._1, row._2))
.toDS()
.groupBy("city")
.agg(collect_list("firstName") as "list")
.show(false)
你将有输出
+-----------+------------------------+
|city |list |
+-----------+------------------------+
|Los Angeles|[Tom] |
|Detroit |[Michael, Peter, George]|
|Chicago |[David, Andrew] |
|Houston |[John] |
|New York |[Jack] |
+-----------+------------------------+
您可以随时转换回
RDD
只需调用 .rdd
功能
关于apache-spark - 如何使用数据集进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44404817/