我有一个数据集如下
{"TS":"1461768452", "IP":"10.10.144.209", "ID":"KA4aIkFB", "DEVICE":"平板电脑", "HOST":"krxd.net"}
我正在尝试执行以下操作
按(id,设备)对所有记录进行分组,并为每个组获取最新时间戳。 然后对 ID 进行区分。
有人可以指导我如何在 Scala 中执行此操作吗? 我知道在 Pig 中我们可以嵌套 foreach 循环。spark 中有等效的吗?
这就是我到目前为止所做的
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext1.read.json("sample.json")
val df2 = df1.select(df1("ID"),df1("DEVICE"),df1("TS"))
val res= df2.rdd.groupBy ( x => (x(0),x(1) )).mapValues ( x=> x.foreach { x => x(2)})
val res1 = res.mapValues(_.maxBy(_.get(2)))
最后 2 条记录似乎有问题。
最佳答案
你可以直接在spark sql中做这样的事情(groupBy和聚合GroupedData),不需要将DataFrame转换成RDD:
测试json文件:test.json
{"TS":"1461768452", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768462", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768472", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768482", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768492", "ID":"KA4aIkFB", "DEVICE":"Phone", "HOST":"krxd.net" }
scala 代码:
val df = sqlContext.read.json("test.json")
df.show
+------+--------+--------+----------+
|DEVICE| HOST| ID| TS|
+------+--------+--------+----------+
|Tablet|krxd.net|KA4aIkFA|1461768452|
|Tablet|krxd.net|KA4aIkFA|1461768462|
|Tablet|krxd.net|KA4aIkFB|1461768472|
|Tablet|krxd.net|KA4aIkFB|1461768482|
| Phone|krxd.net|KA4aIkFB|1461768492|
+------+--------+--------+----------+
val newDF = df.select("ID", "DEVICE", "TS")
.groupBy("ID", "DEVICE")
.agg(max(df("TS")) as "TS")
newDF.show()
+--------+------+----------+
| ID|DEVICE| TS |
+--------+------+----------+
|KA4aIkFB| Phone|1461768492|
|KA4aIkFA|Tablet|1461768462|
|KA4aIkFB|Tablet|1461768482|
+--------+------+----------+
newDF.dropDuplicates("ID").show()
+--------+------+----------+
| ID|DEVICE| TS |
+--------+------+----------+
|KA4aIkFA|Tablet|1461768462|
|KA4aIkFB| Phone|1461768492|
+--------+------+----------+
关于apache-spark - Spark 嵌套 foreach,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39261374/