scala - Spark 从 DataFrame 中删除重复行

标签 scala apache-spark dataframe apache-spark-sql

这个问题在这里已经有了答案:





How to select the first row of each group?

(9 个回答)


5年前关闭。




假设我有一个 DataFrame 像:

val json = sc.parallelize(Seq("""{"a":1, "b":2, "c":22, "d":34}""","""{"a":3, "b":9, "c":22, "d":12}""","""{"a":1, "b":4, "c":23, "d":12}"""))
val df = sqlContext.read.json(json)

我想根据列“b”的值删除列“a”的重复行。即,如果列“a”有重复的行,我想保留“b”值较大的行。对于上面的例子,经过处理,我只需要

{"a":3, "b":9, "c":22, "d":12}





{"a":1, "b":4, "c":23, "d":12}



Spark DataFrame dropDuplicates API 似乎不支持这一点。使用 RDD 方法,我可以做一个 map().reduceByKey() ,但是有什么 DataFrame 特定的操作可以做到这一点?

感谢一些帮助,谢谢。

最佳答案

您可以在 sparksql 中使用窗口函数来实现这一点。

df.registerTempTable("x")
sqlContext.sql("SELECT a, b,c,d  FROM( SELECT *, ROW_NUMBER()OVER(PARTITION BY a ORDER BY b DESC) rn FROM x) y WHERE rn = 1").collect

这将实现您所需要的。
阅读更多关于窗口函数支持 https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

关于scala - Spark 从 DataFrame 中删除重复行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35498162/

相关文章:

scala - 如何在 spark 3.0+ 中获得一年中的一周?

apache-spark - neo4j-mazerunner,如何在 docker-compose.yml 中增加内存大小

R 合并 data.frames asof join

scala - 是否有任何理由制作 Future[Try[A]] 类型的 API 而不是 Future[A]?

mysql - docker.io - 使用 Scala 连接到 MySQL

apache-spark - Spark Streaming Bug - Windowed DStream 的窗口不起作用

r - 向空 data.frame 添加一列

arrays - Spark获取嵌套对象的数据类型

eclipse - Play 2 : "reference to form is ambiguous" error message in template

Scala:如何对泛型类的类型参数提出要求?