apache-spark - 如何使用数据集进行分组

标签 apache-spark dataset apache-spark-2.0

我有一个使用 rdd 的请求:

val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
  )
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)
结果是:

(New York,List(Jack))

(Detroit,List(Michael, Peter, George))

(Los Angeles,List(Tom))

(Houston,List(John))

(Chicago,List(David, Andrew))


如何在 spark2.0 中使用数据集?
我有自定义函数的使用方法,但是感觉好复杂,有没有简单点的方法?

最佳答案

我建议您从创建 case class 开始作为

case class Monkey(city: String, firstName: String)

case class应该在主类之外定义。然后你就可以使用 toDS功能及用途groupByaggregation函数调用 collect_list如下
import sqlContext.implicits._
import org.apache.spark.sql.functions._

val test = Seq(("New York", "Jack"),
  ("Los Angeles", "Tom"),
  ("Chicago", "David"),
  ("Houston", "John"),
  ("Detroit", "Michael"),
  ("Chicago", "Andrew"),
  ("Detroit", "Peter"),
  ("Detroit", "George")
)
sc.parallelize(test)
  .map(row => Monkey(row._1, row._2))
  .toDS()
  .groupBy("city")
  .agg(collect_list("firstName") as "list")
  .show(false)

你将有输出
+-----------+------------------------+
|city       |list                    |
+-----------+------------------------+
|Los Angeles|[Tom]                   |
|Detroit    |[Michael, Peter, George]|
|Chicago    |[David, Andrew]         |
|Houston    |[John]                  |
|New York   |[Jack]                  |
+-----------+------------------------+

您可以随时转换回 RDD只需调用 .rdd功能

关于apache-spark - 如何使用数据集进行分组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44404817/

相关文章:

python - Apache 星火 : How to use pyspark with Python 3

mysql - 在 spark 中连接具有重复列名的表

scala - 函数与整数值比较时产生 Spark

apache-spark - Spark Dataframe中的reducebykey和aggregatebykey

apache-spark - 为什么 Apache Livy session 显示应用程序 ID 为 NULL?

scala - Spark 的 RDD.map() 不会执行,除非 RDD 内的项被访问

python - 如何在 Spark 中使用 Sklearn 模型进行预测?

asp.net - 如何将分页应用于数据集?

python - 从 6GB csv 文件采样,无需在 Python 中加载

pyspark - 在独立集群上运行 spark 时出错