java - 按列分组并对 csv 文件进行排序 Spark

标签 java apache-spark apache-spark-sql

下面是我正在处理的 csv 文件的示例:

life id,policy id,benefit id,date of commencment,status
xx_0,0,0,11/11/2017,active
xx_0,0,0,12/12/2017,active
axb_0,1,0,10/01/2015,active
axb_0,1,0,11/10/2014,active
fxa_2,0,1,01/02/203,active

我想要做的是将数据分组( lifeid + policyid + benefitid )并按日期排序,然后采用每个组的最近(最后一个)元素对其进行一些控制。

在 Spark 上执行此操作的最佳方法是什么?

最佳答案

在 Spark 中执行此操作的最佳方法可能是使用数据帧(请参阅 How to select the first row of each group? )。但我读到您想避免使用它们。纯 RDD 解决方案可以编写如下:

val rdd = sc.parallelize(Seq("xx_0,0,0,11/11/2017,active",
    "xx_0,0,0,12/12/2017,active",
    "axb_0,1,0,10/01/2015,active",
    "axb_0,1,0,11/10/2014,active",
    "fxa_2,0,1,01/02/203,active"))

rdd
    .map(_.split(","))
    .map(x=> x.slice(0,3).reduce(_+","+_) -> 
        (new SimpleDateFormat("dd/MM/yyyy").parse(x(3)).getTime, x(4)))
    .reduceByKey((a,b) => if(a._1 > b._1) a else b)
    .map(x=> x._1+","+x._2._1+","+x._2._2)
    .collect.foreach(println)

关于java - 按列分组并对 csv 文件进行排序 Spark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50413831/

相关文章:

java - 如何在int中添加空格?

java.lang.ClassNotFoundException : org. 用于 Spark 3.0.0 的 apache.spark.sql.sources.v2.DataSourceV2

apache-spark - Spark 结构化流与 ElasticSearch 集成

scala - Spark 多个动态聚合函数,countDistinct 不起作用

python - 高效的字符串后缀检测

Java快速检查网络连接

java - 编写使用 TTS(文本转语音)的程序应该从哪里开始?

java - 打印网络服务器中存在的目录结构

apache-spark - 如果s3文件夹路径不存在,Spark会创建它吗?

java - 数组中的 2 个值到我的 DataFrame 中的 2 列