python - PySpark:分组数据聚合中的自定义函数

标签 python sql dataframe pyspark

我有一个包含以下内容的 PySpark DataFrame

Row(id='id1', type='A', status='H', keywords=['k1', 'k2', 'k3'])

状态是一个二元选项('S'/'H')。 我需要做的是计算每个关键字每个类型、ID 和状态的状态 S 出现的比率。 比例将是

s/(s+h)

这里的 sh 是出现的地方。 因此,例如,如果关键字 k1 在类型 A 中作为 S 出现 2 次,作为 H 出现 3 次,我将在该类型和我的最终输出中获得 2/3理想情况下是

Row(id='id1', type='A', keyword='k1', ratio=0.66)

我认为这必须经过几个步骤,我很乐意计算 S 和 H 中的出现次数,然后创建更多的列来对两者进行比值。

但是,在按“id”、“type”和“status”运行 groupBy 之后,我如何计算上述事件?有没有一种方法可以运行带有自定义函数的agg

最佳答案

像这样的东西应该可以解决问题:

from pyspark.sql.functions import explode, avg, col

ratio = avg(
    # If status "S" then 1.0 else 0.0
    (col("status") == "S").cast("double")
 ).alias("ratio")

(df
    .withColumn("keyword", explode("keywords"))
    .groupBy("id", "type", "keyword")
    .agg(ratio))

关于python - PySpark:分组数据聚合中的自定义函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35989558/

相关文章:

python - 自定义排序python

sql - 如何在 Google Data Studio 中组合两个日期字段?

python - 在 Pandas 数据框中获得最小的一行

python - 如何对 pandas DataFrame 中的值进行二值化?

python - 在列表中的两个元素之间创建链接

python - 安装 anaconda - "failed to create anaconda menus"

sql - jpql INTERSECT 没有 INTERSECT

python - Django ORM 查询 - 如何从模型字段中减去时间?

python - 来自 pandas DataFrame 的热图 - 二维数组

python - 将一个大长方体切割成尺寸在一定范围内的随机小长方体