我以这种方式从 csv 文件中读取了一个 Spark Dataframe:
df = ss.read \
.format("csv") \
.option("delimiter", ";") \
.option("header", "false") \
.option("inferSchema", "true") \
.option("escape", "\"") \
.option("multiline", "true") \
.option("wholeFile", "true") \
.load(file_path)
Dataframe 是这样的:
|cod_cli|article_name|rank|
|123 |art_1 |1 |
|123 |art_2 |2 |
|123 |art_3 |3 |
|456 |art_4 |1 |
|456 |art_5 |2 |
|456 |art_6 |3 |
我想按 cod_cli 列对元素进行分组并创建多个列,一个用于分组集中的每个产品,一个字典键值作为值,键作为列名并将与该列名称相关的值作为值,如下所示:
|cod_cli|Product 1 |Product 2 |Product 3 |
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
字典值可以是字符串(更好)或映射。 我这样试过:
df = df \
.groupBy(F.col("cod_cli")) \
.agg(F.collect_list(F.array("cod_art","rank")))
但通过这种方式,我创建了一个包含所有分组元素的数组列的列。
请问有人能帮帮我吗?
谢谢
更新
提出的解决方案是这个:
df = df.withColumn(
"Product",
F.to_json(
F.struct(F.col("cod_art"), F.col("rank"))
)
)
通过这种方式,我使用所需的 json 字符串创建了一个“Product”列,例如 {cod_art : art_1, rank : 1}
。
然后:
df = df \
.groupBy(F.col("cod_cli")) \
.pivot("rank") \
.agg(F.first("Product"))
这样,我可以为每个产品创建一个列,按 cod_cli 属性分组,并处理我有超过 3 个产品作为列的情况:
|cod_cli|1 |2 |3
|123 |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456 |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
最佳答案
您可以在没有 pivot
(昂贵的操作)的情况下使用 collect_list
struct
,然后是 to_json
和 create_map
.
from pyspark.sql import functions as F
df\
.groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
.alias("arr"))\
.select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
.alias("Product{}".format(x+1)) for x in range(3)))\
.show(truncate=False)
#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1 |Product2 |Product3 |
#+-------+------------------------------+------------------------------+------------------------------+
#|123 |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456 |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+
关于python - Pyspark 按列分组元素并创建字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62391444/