apache-spark - Spark 嵌套 foreach

标签 apache-spark foreach nested

我有一个数据集如下

{"TS":"1461768452", "IP":"10.10.144.209", "ID":"KA4aIkFB", "DEVICE":"平板电脑", "HOST":"krxd.net"}

我正在尝试执行以下操作

按(id,设备)对所有记录进行分组,并为每个组获取最新时间戳。 然后对 ID 进行区分。

有人可以指导我如何在 Scala 中执行此操作吗? 我知道在 Pig 中我们可以嵌套 foreach 循环。spark 中有等效的吗?

这就是我到目前为止所做的

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext1.read.json("sample.json")
val df2 = df1.select(df1("ID"),df1("DEVICE"),df1("TS"))
val res= df2.rdd.groupBy ( x => (x(0),x(1) )).mapValues ( x=> x.foreach { x => x(2)})
val res1 = res.mapValues(_.maxBy(_.get(2)))

最后 2 条记录似乎有问题。

最佳答案

你可以直接在spark sql中做这样的事情(groupBy和聚合GroupedData),不需要将DataFrame转换成RDD:

测试json文件:test.json

{"TS":"1461768452", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768462", "ID":"KA4aIkFA", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768472", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768482", "ID":"KA4aIkFB", "DEVICE":"Tablet", "HOST":"krxd.net" }
{"TS":"1461768492", "ID":"KA4aIkFB", "DEVICE":"Phone", "HOST":"krxd.net" }

scala 代码:

val df = sqlContext.read.json("test.json")
df.show
+------+--------+--------+----------+
|DEVICE|    HOST|      ID|        TS|
+------+--------+--------+----------+
|Tablet|krxd.net|KA4aIkFA|1461768452|
|Tablet|krxd.net|KA4aIkFA|1461768462|
|Tablet|krxd.net|KA4aIkFB|1461768472|
|Tablet|krxd.net|KA4aIkFB|1461768482|
| Phone|krxd.net|KA4aIkFB|1461768492|
+------+--------+--------+----------+

val newDF =  df.select("ID", "DEVICE", "TS")
               .groupBy("ID", "DEVICE")
               .agg(max(df("TS")) as "TS")
newDF.show()
+--------+------+----------+
|      ID|DEVICE|   TS     |
+--------+------+----------+
|KA4aIkFB| Phone|1461768492|
|KA4aIkFA|Tablet|1461768462|
|KA4aIkFB|Tablet|1461768482|
+--------+------+----------+

newDF.dropDuplicates("ID").show()
+--------+------+----------+
|      ID|DEVICE|   TS     |
+--------+------+----------+
|KA4aIkFA|Tablet|1461768462|
|KA4aIkFB| Phone|1461768492|
+--------+------+----------+

关于apache-spark - Spark 嵌套 foreach,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39261374/

相关文章:

java - 在 EMR 上提交 JAR 时出现 ClassNotFoundException

php - 添加计数到 jQuery 在 PHP foreach 中添加的行

python - 如何使用 Python 打印嵌套字典中的特定整数变量?

hadoop - spark2-shell 中的 Log4j

hadoop - 使用 --proxy-user、--keytab 和 --principal 参数在 hadoop kerberos 中提交 spark-submit

loops - lisp 错误处理 - 检查变量是否为 nil

php - Woocommerce wp_query 按 ID 获取订单

list - 悬停时是否可以更改嵌套列表的颜色?

mysql - 如何使用 MySQL 在表中创建嵌套 AUTO_INCRMENT 字段?

python - 使用 Spark 获取值超过某个阈值的所有列的名称