给定一个如下表:
+--+------------------+-----------+
|id| diagnosis_age| diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 1| 2.80033330216659| 315.320000|
| 1| 2.8222365762732| 315.320000|
| 1| 5.64822705794013| 325.320000|
| 1| 5.686557787521759| 335.320000|
| 2| 5.70572315231258| 315.320000|
| 2| 5.724888517103389| 315.320000|
| 3| 5.744053881894209| 315.320000|
| 3|5.7604813374292005| 315.320000|
| 3| 5.77993740687426| 315.320000|
+--+------------------+-----------+
我试图通过仅考虑每个 ID 诊断年龄最小的诊断来减少每个 ID 的记录量。在 SQL 中,您可以将表与其自身连接起来,例如:
SELECT a.id, a.diagnosis_age, a.diagnosis
FROM tbl1 a
INNER JOIN
(SELECT id, MIN(diagnosis_age) AS min_diagnosis_age
FROM tbl1
GROUP BY id) b
ON b.id = a.id
WHERE b.min_diagnosis_age = a.diagnosis_age
如果它是一个 rdd,你可以这样做:
rdd.map(lambda x: (x["id"], [(x["diagnosis_age"], x["diagnosis"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))
仅使用 Spark 数据帧操作如何实现相同的效果?如果这可能的话?具体来说,没有 sql/rdd 操作。
谢谢
最佳答案
您可以将window
与first
函数一起使用,然后过滤
掉所有其他函数。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id").orderBy("diagnosis_age")
df.withColumn("least_age", F.first("diagnosis_age").over(w))\
.filter("diagnosis_age=least_age").drop("least_age").show()
+---+------------------+---------+
| id| diagnosis_age|diagnosis|
+---+------------------+---------+
| 1|2.1843037179180302| 315.32|
| 3| 5.744053881894209| 315.32|
| 2| 5.70572315231258| 315.32|
+---+------------------+---------+
您也可以不使用窗口函数,使用groupBy
min
和first
:
from pyspark.sql import functions as F
df.orderBy("diagnosis_age").groupBy("id")\
.agg(F.min("diagnosis_age").alias("diagnosis_age"), F.first("diagnosis").alias("diagnosis"))\
.show()
+---+------------------+---------+
| id| diagnosis_age|diagnosis|
+---+------------------+---------+
| 1|2.1843037179180302| 315.32|
| 3| 5.744053881894209| 315.32|
| 2| 5.70572315231258| 315.32|
+---+------------------+---------+
请注意,我在 groupyBy
之前订购 By diagnosis_age
,以处理您所需的诊断值未出现在组的第一行。 但是,如果您的数据已按 diagnosis_age
排序,您可以使用上述代码而无需 orderBy
。
关于dataframe - pyspark使用每个id的最小值过滤数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60836897/