apache-spark - 如何迭代一个组并使用 Pyspark 创建一个数组列?

标签 apache-spark pyspark group-by apache-spark-sql

我有一个包含组和百分比的数据框

| Group | A % | B % | Target % |
| ----- | --- | --- | -------- |
| A     | .05 | .85 | 1.0      |
| A     | .07 | .75 | 1.0      |
| A     | .08 | .95 | 1.0      |
| B     | .03 | .80 | 1.0      |
| B     | .05 | .83 | 1.0      |
| B     | .04 | .85 | 1.0      |

我希望能够按 Group 列迭代 A % 列,并从列 B % 中找到一个值数组,当求和时A% 列中的每个值都小于或等于 Target % 列。

| Group | A % | B % | Target % | SumArray     |
| ----- | --- | --- | -------- | ------------ |
| A     | .05 | .85 | 1.0      | [.85,.75,.95]|
| A     | .07 | .75 | 1.0      | [.85,.75]    |
| A     | .08 | .95 | 1.0      | [.85,.75]   |
| B     | .03 | .80 | 1.0      | [.80,.83,.85]|
| B     | .05 | .83 | 1.0      | [.80,.83,.85]|
| B     | .04 | .85 | 1.0      | [.80,.83,.85]|

我希望能够使用 PySpark 来解决这个问题。有什么想法可以解决这个问题吗?

最佳答案

您可以使用 collect_list 函数获取 B % 的数组按 Group 分组的列值然后列 filter 使用您的条件生成的数组 A + B <= Target :

from pyspark.sql import Window
import pyspark.sql.functions as F

df2 = df.withColumn(
    "SumArray",
    F.collect_list(F.col("B")).over(Window.partitionBy("Group"))
).withColumn(
    "SumArray",
    F.expr("filter(SumArray, x -> x + A <= Target)")
)
df2.show()

# +-----+----+----+------+------------------+
# |Group|   A|   B|Target|          SumArray|
# +-----+----+----+------+------------------+
# |    B|0.03| 0.8|   1.0| [0.8, 0.83, 0.85]|
# |    B|0.05|0.83|   1.0| [0.8, 0.83, 0.85]|
# |    B|0.04|0.85|   1.0| [0.8, 0.83, 0.85]|
# |    A|0.05|0.85|   1.0|[0.85, 0.75, 0.95]|
# |    A|0.07|0.75|   1.0|      [0.85, 0.75]|
# |    A|0.08|0.95|   1.0|      [0.85, 0.75]|
# +-----+----+----+------+------------------+

关于apache-spark - 如何迭代一个组并使用 Pyspark 创建一个数组列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69845656/

相关文章:

apache-spark - SparkContext.addJar 在本地模式下不起作用

scala - Scala中的Spark单词计数(在Apache Sandbox中运行)

python - Spark join 抛出 'function' object has no attribute '_get_object_id' 错误。我该如何解决?

pyspark - AWS EMR 集群中的权限被拒绝 : user=zeppelin while using %spark. pyspark 解释器

apache-spark - 如何确保由 Spark DataFrame join 引起的分区?

python - Pandas GroupBy 并将唯一值的计数添加为新列

hadoop - 如何将 Hive 表转换为 MLlib LabeledPoint?

sql-server - PySpark 1.5 和 MSSQL jdbc

MySQL 连接优化 : Improving join type with derived tables and GROUP BY

mysql - "Error Code: 1111. Invalid use of group function"带有 JSON 函数