环境:Java 1.8、VM Cloudera 快速入门。 我将 csv 文件中的数据存入 Hadoop hdfs。每行代表一条公交路线。
id vendor start_datetime end_datetime trip_duration_in_sec
17534 A 1/1/2013 12:00 1/1/2013 12:14 840
68346 A 1/1/2013 12:13 1/1/2013 12:18 300
09967 B 1/1/2013 12:34 1/1/2013 12:39 300
09967 B 1/1/2013 12:44 1/1/2013 12:51 420
09967 A 1/1/2013 12:54 1/1/2013 12:56 120
.........
.........
所以,我想要每天找到每个供应商(A 和 B)拥有最多巴士路线的时间。用java和spark。 结果可能是:
1/1/2013 (Day 1) - Vendor A has 3 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor A had the most bus routes..)
1/1/2013 (Day 1) - Vendor B has 2 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor B had the most bus routes..)
....
Mu java代码是:
import static org.apache.spark.sql.functions;
import static org.apache.spark.sql.Row;
Dataset<Row> ds;
ds.groupBy(functions.window(col("start_datetime"), "1 hour").count().show();
但我找不到每天最多的路线是在哪个小时。
最佳答案
我对 Java 不太熟悉,所以我尝试用 Scala 来解释它。
找出每个供应商每天最大路线的小时数的关键是按 (vendor, day, hour)
来计数,然后按 (vendor, day)
聚合计算每组最大cnt对应的小时。 day
和 hour
每条记录可以通过 start_datetime
进行解析.
val df = spark.createDataset(Seq(
("17534","A","1/1/2013 12:00","1/1/2013 12:14",840),
("68346","A","1/1/2013 12:13","1/1/2013 12:18",300),
("09967","B","1/1/2013 12:34","1/1/2013 12:39",300),
("09967","B","1/1/2013 12:44","1/1/2013 12:51",420),
("09967","A","1/1/2013 12:54","1/1/2013 12:56",120)
)).toDF("id","vendor","start_datetime","end_datetime","trip_duration_in_sec")
df.rdd.map(t => {
val vendor = t(1)
val day = t(2).toString.split(" ")(0)
val hour = t(2).toString.split(" ")(1).split(":")(0)
((vendor, day, hour), 1)
})
// count by key
.aggregateByKey(0)((x: Int, y: Int) =>x+y, (x: Int, y: Int) =>x+y)
.map(t => {
val ((vendor, day, hour), cnt) = t;
((vendor, day), (hour, cnt))
})
// solve the max cnt by key (vendor, day)
.foldByKey(("", 0))((z: (String, Int), i: (String, Int)) => if (i._2 > z._2) i else z)
.foreach(t => println(s"${t._1._2} - Vendor ${t._1._1} has ${t._2._2} bus routes from ${t._2._1}:00 hour."))
关于java - 从 Java 中的 DataFrame 查找每天的最大行程 - Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59604881/