python - PySpark 数据帧 : Find closest value and slice the DataFrame

标签 python apache-spark pyspark apache-spark-sql

所以,我已经做了足够的研究,但还没有找到解决我想做的事情的帖子。

我有一个 PySpark DataFrame my_df,它按 value排序-

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
|   F|   15|
|   G|   10|
+----+-----+

value 列中所有计数的总和等于 136。我想获取所有 combined values >= x% of 136 的行。在此示例中,假设 x=80。然后 目标总和 = 0.8*136 = 108.8。因此,新的 DataFrame 将包含所有具有 combined value >= 108.8 的行。

在我们的示例中,这将归结为 D 行(因为组合值高达 D = 30+25+20+18 = 93)。

但是,困难的部分是我还想包括紧随其后的具有重复值的行。在这种情况下,我还想包括 E 行,因为它与 D 行具有相同的值,即 18

我想通过给 x 变量一个百分比来分割 my_df,例如上面讨论的 80。新的 DataFrame 应包含以下行-

+----+-----+                                                                    
|name|value|
+----+-----+
|   A|   30|
|   B|   25|
|   C|   20|
|   D|   18|
|   E|   18|
+----+-----+

我在这里可以做的一件事是遍历 DataFrame (大约 360k 行),但我想这违背了 Spark 的目的。

这里有我想要的简洁功能吗?

最佳答案

使用 pyspark SQL 函数可以简洁地执行此操作。

result = my_df.filter(my_df.value > target).select(my_df.name,my_df.value)
result.show()

编辑:基于 OP 的问题编辑 - 计算运行总和并获取行,直到达到目标值。请注意,这将导致行最多为 D,而不是 E..这似乎是一个奇怪的要求。

from pyspark.sql import Window
from pyspark.sql import functions as f

# Total sum of all `values`
target = (my_df.agg(sum("value")).collect())[0][0]

w = Window.orderBy(my_df.name) #Ideally this should be a column that specifies ordering among rows
running_sum_df = my_df.withColumn('rsum',f.sum(my_df.value).over(w))
running_sum_df.filter(running_sum_df.rsum <= 0.8*target)

关于python - PySpark 数据帧 : Find closest value and slice the DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55383785/

相关文章:

python - nltk如何给出多个分隔的句子

python - Python 中是否有 MATLAB 函数 `mscohere` 的类似物?

scala - Spark 1.5 MlLib LDA - 获取新文档的主题分布

apache-spark - 为什么 spark-ml ALS 模型返回 NaN 和负数预测?

apache-spark - 尝试在 PySpark DataFrame 中创建具有最大时间戳的列

python - 使用 cuda 7.0 在 ubuntu 14.04 中安装 pycuda

python - 如何将嵌套未知层数的新 JSON 节点添加到现有 JSON 文件中?

apache-spark - Spark 连接执行器失败

python - 如何使用 Python 连接 HBase 和 Spark?

python - Pyspark:以表格格式显示 Spark 数据框