scala - 如何从scala中的RDD中获取最早的时间戳日期

标签 scala apache-spark mapreduce

我有一个类似于 ((String, String), TimeStamp) 的 RDD。我有大量记录,我想为每个键选择 具有最新 TimeStamp 值的记录。我已经尝试了以下代码,但仍在努力解决这个问题。有人可以帮我做这个吗?

我尝试的以下代码是错误的,并且不能正常工作

val context = sparkSession.read.format("jdbc")
  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", url)
  .option("dbtable", "student_risk")
  .option("user", "user")
  .option("password", "password")
  .load()
context.cache();

val studentRDD = context.rdd.map(r => ((r.getString(r.fieldIndex("course_id")), r.getString(r.fieldIndex("student_id"))), r.getTimestamp(r.fieldIndex("risk_date_time"))))
val filteredRDD = studentRDD.collect().map(z => (z._1, z._2)).reduce((x, y) => (x._2.compareTo(y._2)))

最佳答案

直接在DataFrame上很容易做到(这里奇怪地命名为context):

val result = context
  .groupBy("course_id", "student_id")
  .agg(min("risk_date_time") as "risk_date_time")

然后您可以像以前一样将其转换为 RDD(如果需要) - 结果具有相同的架构。

如果你确实想在 RDD 上执行此操作,请使用 reduceByKey:

studentRDD.reduceByKey((t1, t2) => if (t1.before(t2)) t1 else t2)

关于scala - 如何从scala中的RDD中获取最早的时间戳日期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42268827/

相关文章:

apache-spark - Spark 结构流和批处理是否相同?

memory - Apache Spark - 内存管理

java - 如何使用spark Streaming从HDFS读取数据?

hadoop - 我想用 PIG 中的 AVG 替换 NULL 值

file - hadoop中的序列文件是什么?

Java/Scala : Assign the program/application arguments based on the machine name/ip

postgresql - 使用 Play Slick 在 PostgreSQL 中保留 UUID - java.sql.BatchUpdateException

scala - Spark : Efficient way to get top K frequent values per key in (key, 值)RDD?

mysql - [运行时异常 : SqlMappingError(No rows when expecting a single one)]

scala - 不能将 PartialFunction 放在 Scala 类构造函数中