python - 获取 Pyspark 中缺失评级的评级列表列,其中 0 已到位

标签 python apache-spark pyspark

我有一个如下所示的数据框,我想添加一个按 id 分组的列“ratings_list”,并将评级放入列表中,其中列表索引是项目编号

id | item | rating
1  | 1    | 5
1  | 2    | 4
1  | 4    | 5
1  | 7    | 3
2  | 5    | 3
2  | 2    | 5
2  | 3    | 5

理想情况下会导致

id | rating_list
1  | [5,4,0,5,0,0,3]
2  | [0,5,5,0,3,0,0]

其中 rating_list 的长度是数据框中不同项目的数量。到目前为止,我有一个包含项目列表和评级列表的数据框,但我不确定这是否是适当的中间步骤

id | item_list | rating_list
1  | [1,2,4,7] | [5,4,5,3]
2  | [2,3,5]   | [5,5,3]

这将是一个巨大的数据框,所以我更喜欢更快的东西。

最佳答案

这是另一个基于观察的解决方案,max(item) == max_array_length,如果假设无效,请告诉我。

from pyspark.sql.functions import expr, collect_list, min, max, sequence, lit

# max item implies max array length
maxi = df.select(max("item").alias("maxi")).first()["maxi"]

df = df.groupBy("id").agg( \
      collect_list("item").alias("items"),
      collect_list("rating").alias("ratings")
).withColumn("idx", sequence(lit(1), lit(maxi)))

# we are projecting an array[K] into array[N] where K <= N 
rating_expr = expr("""transform(idx, i -> if(array_position(items, i) >= 1, 
                                                 ratings[array_position(items, i) - 1], 
                                                 0))""")

df.select(df.id, rating_expr.alias("rating_list"))

# +---+---------------------+
# |id |rating_list          |
# +---+---------------------+
# |1  |[5, 4, 0, 5, 0, 0, 3]|
# |2  |[0, 5, 5, 0, 3, 0, 0]|
# +---+---------------------+

分析:迭代idx,如果当前项目(即i)存在于items中,则使用其位置通过 ratings[array_position(items)从评级中检索项目, i) - 1],否则 0。

关于python - 获取 Pyspark 中缺失评级的评级列表列,其中 0 已到位,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62329722/

相关文章:

apache-spark - pyspark ml错误-u'要求失败: Cannot have an empty string for name'

python - 有没有办法将结果流式传输到驱动程序,而无需等待所有分区完成执行?

python - 使用来自 StringIndexer 的标签进行 IndexToString 转换

Python 客户端支持在 Amazon EMR 上运行 Hive

python - 展开嵌套的 Python 字典

python - 用 Python 从许多 Google 搜索中抓取链接

scala - 在 Spark Streaming 中重用 kafka producer

python - 打开包含未定义字符的文件(csv.gz)并将文件传递给函数

scala - 如果列表中存在,则从列中删除单词

apache-spark - 无法加载 Parquet 文件(不支持 Parquet 类型 : INT32 (UINT_8);) with pyspark