java - 从 Java 中的 DataFrame 查找每天的最大行程 - Spark

标签 java apache-spark hadoop

环境:Java 1.8、VM Cloudera 快速入门。 我将 csv 文件中的数据存入 Hadoop hdfs。每行代表一条公交路线。

    id      vendor    start_datetime   end_datetime    trip_duration_in_sec
    17534   A         1/1/2013 12:00   1/1/2013 12:14  840
    68346   A         1/1/2013 12:13   1/1/2013 12:18  300
    09967   B         1/1/2013 12:34   1/1/2013 12:39  300
    09967   B         1/1/2013 12:44   1/1/2013 12:51  420
    09967   A         1/1/2013 12:54   1/1/2013 12:56  120
   .........
   .........

所以,我想要每天找到每个供应商(A 和 B)拥有最多巴士路线的时间。用java和spark。 结果可能是:

1/1/2013 (Day 1) - Vendor A has 3 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor A had the most bus routes..)
1/1/2013 (Day 1) - Vendor B has 2 bus routes at 12:00-13:00 hour. (That time 12:00-13:00, vendor B had the most bus routes..)
....

Mu java代码是:

import static org.apache.spark.sql.functions;
import static org.apache.spark.sql.Row;

Dataset<Row> ds;
ds.groupBy(functions.window(col("start_datetime"), "1 hour").count().show();

但我找不到每天最多的路线是在哪个小时。

最佳答案

我对 Java 不太熟悉,所以我尝试用 Scala 来解释它。

找出每个供应商每天最大路线的小时数的关键是按 (vendor, day, hour) 来计数,然后按 (vendor, day) 聚合计算每组最大cnt对应的小时。 dayhour每条记录可以通过 start_datetime 进行解析.

val df = spark.createDataset(Seq(
("17534","A","1/1/2013 12:00","1/1/2013 12:14",840),
("68346","A","1/1/2013 12:13","1/1/2013 12:18",300),
("09967","B","1/1/2013 12:34","1/1/2013 12:39",300),
("09967","B","1/1/2013 12:44","1/1/2013 12:51",420),
("09967","A","1/1/2013 12:54","1/1/2013 12:56",120)
)).toDF("id","vendor","start_datetime","end_datetime","trip_duration_in_sec")

df.rdd.map(t => {
    val vendor = t(1)
    val day = t(2).toString.split(" ")(0)
    val hour = t(2).toString.split(" ")(1).split(":")(0)
    ((vendor, day, hour), 1)
})
// count by key
.aggregateByKey(0)((x: Int, y: Int) =>x+y, (x: Int, y: Int) =>x+y) 
.map(t => {
    val ((vendor, day, hour), cnt) = t;
    ((vendor, day), (hour, cnt))
})
// solve the max cnt by key (vendor, day)
.foldByKey(("", 0))((z: (String, Int), i: (String, Int)) => if (i._2 > z._2) i else z)
.foreach(t => println(s"${t._1._2} - Vendor ${t._1._1} has ${t._2._2} bus routes from ${t._2._1}:00 hour."))

关于java - 从 Java 中的 DataFrame 查找每天的最大行程 - Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59604881/

相关文章:

任意键上的Java Lambda Stream Distinct()?

apache-spark - 如何将管道分隔的列拆分为多行?

hadoop - 在 hadoop 中运行非 mapreduce 程序

date - 将 “Fri Apr 29 06:01:46 EDT 2016”转换为HIVE中的日期格式?

java - JFreeChart:如何获取 XYItemEntity 的坐标?

java - 类 org.apache.struts2.convention.DefaultClassFinder$InfoBuildingVisitor 具有接口(interface) org.objectweb.asm.ClassVisitor 作为父类(super class)

java - 在运行时查找打包的非 java 文件的路径

apache-spark - 集成Kafka和HBase的最佳实践

java - Spark Cassandra 连接器基本简单程序出现 Spark 日志记录错误

hadoop - Sqoop 导出插入重复条目