python - 从 PySpark 中的类别分布中查找值的百分位数

标签 python pandas apache-spark-sql pyspark

我有以下 PySpark 数据帧(例如 df)。它具有列nametimestampcategoryvalue

+------+-------------------+--------+-----+
|  name|          timestamp|category|value|
+------+-------------------+--------+-----+
| name1|2019-01-17 00:00:00|       A|11.23|
| name2|2019-01-17 00:00:00|       A|14.57|
| name3|2019-01-10 00:00:00|       B| 2.21|
| name4|2019-01-10 00:00:00|       B| 8.76|
| name5|2019-01-17 00:00:00|       A|18.71|
| name6|2019-01-10 00:00:00|       A|17.78|
| name7|2019-01-10 00:00:00|       A| 5.52|
| name8|2019-01-10 00:00:00|       A| 9.91|
| name9|2019-01-17 00:00:00|       B| 1.16|
|name10|2019-01-17 00:00:00|       B| 12.0|
+------+-------------------+--------+-----+

我想在上述数据框中添加一个新列,它可以为我提供包含同一类别成员的分布中每个名称值的百分位排名时间戳

我的预期输出如下:

+------+-------------------+--------+-----+---------+
|name  |timestamp          |category|value|pct_value|
+------+-------------------+--------+-----+---------+
|name1 |2019-01-17 00:00:00|A       |11.23|1        |
|name10|2019-01-17 00:00:00|B       |12.0 |2        |
|name2 |2019-01-17 00:00:00|A       |14.57|2        |
|name3 |2019-01-10 00:00:00|B       |2.21 |1        |
|name4 |2019-01-10 00:00:00|B       |8.76 |2        |
+------+-------------------+--------+-----+---------+
only showing top 5 rows

执行此操作的最佳方法是什么?

我尝试过以下方法:

import pyspark.sql.functions as F
from pyspark.sql import Window as W

w_cat = W.partitionBy('category', 'timestamp').orderBy("value")

df_new = ( df.select( '*', F.ntile(1000).over(w_cat).alias( 'pct_value' ) ) ).persist()


df_new.orderBy('name', 'timestamp').show(5,False)

这给出了正确的预期输出。但是,当我在具有数百万行的实际数据上尝试该方法时,此方法需要很长时间(数小时)。

您可以使用下面提到的代码生成上面给出的数据帧 (df):

Stats = Row("name", "timestamp", "category", "value")

stat1 = Stats('name1', "2019-01-17 00:00:00", "A", 11.23)
stat2 = Stats('name2', "2019-01-17 00:00:00", "A", 14.57)
stat3 = Stats('name3', "2019-01-10 00:00:00", "B", 2.21)
stat4 = Stats('name4', "2019-01-10 00:00:00", "B", 8.76)
stat5 = Stats('name5', "2019-01-17 00:00:00", "A", 18.71)
stat6 = Stats('name6', "2019-01-10 00:00:00", "A", 17.78)
stat7 = Stats('name7', "2019-01-10 00:00:00", "A", 5.52)
stat8 = Stats('name8', "2019-01-10 00:00:00", "A", 9.91)
stat9 = Stats('name9', "2019-01-17 00:00:00", "B", 1.16)
stat10 = Stats('name10', "2019-01-17 00:00:00", "B", 12.0)

stat_lst = [stat1 , stat2, stat3, stat4, stat5, stat6, stat7, stat8, stat9, stat10]
df = spark.createDataFrame(stat_lst)

最佳答案

您可以尝试使用percentile_approx函数。

from pyspark.sql import Window
import pyspark.sql.functions as F

grp_window = Window.partitionBy('name')
# For median, i.e. 0.5 quantile
magic_percentile = F.expr('percentile_approx(val, 0.5)')

df.withColumn('pct_value', magic_percentile.over(grp_window))
# OR 
df.groupBy('name').agg(magic_percentile.alias('pct_value'))

您也可以使用percent_rank函数:

df.select('pct_value', percent_rank().over(w).alias("percentile"))\
    .where('percentile == 0.6').show()

您还可以传递一个百分位数数组,但这里的问题是您将获得一个列表作为返回:

quantiles = F.expr('percentile_approx(val, array(0.25, 0.5, 0.75))')

关于python - 从 PySpark 中的类别分布中查找值的百分位数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58404155/

相关文章:

python - 在 Numpy 中制作特殊的对角矩阵

python - 如何使用列表有条件地从 Pandas DataFrame 中删除重复项

apache-spark - 将 to_date 列与 pyspark 中的单个值进行比较

python - 更改数据类型但返回 Dataframe 的空值

apache-spark - 使用 pyspark 在数据 block 中实现 FileNotFound 异常

java - 如何从 hadoopish 文件夹加载 Parquet 文件

python - 如何抑制 "invalid value encountered in double_scalars"消息?

python - 如何从 .yaml 文件访问变量到机器人框架脚本?

python - 从现有数据框创建多索引

Python通过groupby进行求和运算,但排除非数字数据