dataframe - pyspark使用每个id的最小值过滤数据帧

标签 dataframe filter pyspark conditional-statements

给定一个如下表:

+--+------------------+-----------+
|id|     diagnosis_age|  diagnosis|
+--+------------------+-----------+
| 1|2.1843037179180302| 315.320000|
| 1|  2.80033330216659| 315.320000|
| 1|   2.8222365762732| 315.320000|
| 1|  5.64822705794013| 325.320000|
| 1| 5.686557787521759| 335.320000|
| 2|  5.70572315231258| 315.320000|
| 2| 5.724888517103389| 315.320000|
| 3| 5.744053881894209| 315.320000|
| 3|5.7604813374292005| 315.320000|
| 3|  5.77993740687426| 315.320000|
+--+------------------+-----------+

我试图通过仅考虑每个 ID 诊断年龄最小的诊断来减少每个 ID 的记录量。在 SQL 中,您可以将表与其自身连接起来,例如:

SELECT a.id, a.diagnosis_age, a.diagnosis
    FROM tbl1 a
INNER JOIN
(SELECT id, MIN(diagnosis_age) AS min_diagnosis_age
    FROM tbl1
        GROUP BY id) b
ON b.id = a.id
WHERE b.min_diagnosis_age = a.diagnosis_age

如果它是一个 rdd,你可以这样做:

rdd.map(lambda x: (x["id"], [(x["diagnosis_age"], x["diagnosis"])]))\
.reduceByKey(lambda x, y: x + y)\
.map(lambda x: (x[0], [i for i in x[1] if i[0] == min(x[1])[0]]))

仅使用 Spark 数据帧操作如何实现相同的效果?如果这可能的话?具体来说,没有 sql/rdd 操作。

谢谢

最佳答案

您可以将windowfirst函数一起使用,然后过滤掉所有其他函数。

from pyspark.sql import functions as F
from pyspark.sql.window import Window
w=Window().partitionBy("id").orderBy("diagnosis_age")
df.withColumn("least_age", F.first("diagnosis_age").over(w))\
.filter("diagnosis_age=least_age").drop("least_age").show()

+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  3| 5.744053881894209|   315.32|
|  2|  5.70572315231258|   315.32|
+---+------------------+---------+

您也可以不使用窗口函数,使用groupBy minfirst:

from pyspark.sql import functions as F
df.orderBy("diagnosis_age").groupBy("id")\
.agg(F.min("diagnosis_age").alias("diagnosis_age"), F.first("diagnosis").alias("diagnosis"))\
.show()
+---+------------------+---------+
| id|     diagnosis_age|diagnosis|
+---+------------------+---------+
|  1|2.1843037179180302|   315.32|
|  3| 5.744053881894209|   315.32|
|  2|  5.70572315231258|   315.32|
+---+------------------+---------+

请注意,我在 groupyBy 之前订购 By diagnosis_age,以处理您所需的诊断值未出现在的第一行但是,如果您的数据已按 diagnosis_age 排序,您可以使用上述代码而无需 orderBy

关于dataframe - pyspark使用每个id的最小值过滤数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60836897/

相关文章:

python - 通过一次追加一行来创建 Pandas 数据框

python - 如何根据特定条件从 Pandas 数据框中随机选择行?

javascript - AngularJS:如何使用 $filter 按两个表达式进行过滤

python - Pyspark错误+方法__getnewargs__([])不存在

python - Pandas 蟒 : Delete Rows of DF That Have ASCII Letters

r - 结合列表中的 df 并仅对特定值求平均值

angularjs - 如何在 AngularJS 过滤器中从数组中排除对象?

python - 构建具有动态数量字段的 Django 优雅过滤器

python - Spark 数据帧随机拆分

pandas - 将重复的 ID 与增量编号绑定(bind)