python - Pyspark 按列分组元素并创建字典

标签 python dataframe csv apache-spark pyspark

我以这种方式从 csv 文件中读取了一个 Spark Dataframe:

df = ss.read \
     .format("csv") \
     .option("delimiter", ";") \
     .option("header", "false") \
     .option("inferSchema", "true") \
     .option("escape", "\"") \
     .option("multiline", "true") \
     .option("wholeFile", "true") \
     .load(file_path)

Dataframe 是这样的:

|cod_cli|article_name|rank|
|123    |art_1       |1   |
|123    |art_2       |2   |
|123    |art_3       |3   |
|456    |art_4       |1   |
|456    |art_5       |2   |
|456    |art_6       |3   |

我想按 cod_cli 列对元素进行分组并创建多个列,一个用于分组集中的每个产品,一个字典键值作为值,键作为列名并将与该列名称相关的值作为值,如下所示:

|cod_cli|Product 1                  |Product 2                  |Product 3                  |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

字典值可以是字符串(更好)或映射。 我这样试过:

df = df \
     .groupBy(F.col("cod_cli")) \
     .agg(F.collect_list(F.array("cod_art","rank")))

但通过这种方式,我创建了一个包含所有分组元素的数组列的列。

请问有人能帮帮我吗?

谢谢

更新

提出的解决方案是这个:

df = df.withColumn(
            "Product",
            F.to_json(
                F.struct(F.col("cod_art"), F.col("rank"))
            )
        )

通过这种方式,我使用所需的 json 字符串创建了一个“Product”列,例如 {cod_art : art_1, rank : 1}

然后:

df = df \
     .groupBy(F.col("cod_cli")) \
     .pivot("rank") \
     .agg(F.first("Product"))

这样,我可以为每个产品创建一个列,按 cod_cli 属性分组,并处理我有超过 3 个产品作为列的情况:

|cod_cli|1                          |2                          |3               
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

最佳答案

您可以在没有 pivot(昂贵的操作)的情况下使用 collect_list struct,然后是 to_jsoncreate_map .

from pyspark.sql import functions as F

df\
  .groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
                          .alias("arr"))\
  .select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
                       .alias("Product{}".format(x+1)) for x in range(3)))\
  .show(truncate=False)

#+-------+------------------------------+------------------------------+------------------------------+
#|cod_cli|Product1                      |Product2                      |Product3                      |
#+-------+------------------------------+------------------------------+------------------------------+
#|123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
#|456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
#+-------+------------------------------+------------------------------+------------------------------+

关于python - Pyspark 按列分组元素并创建字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62391444/

相关文章:

python - 如何解决此功能,使其对程序中的所有选项都有效?

python - plotly 中的饼图

python - pyreverse 不绘制关系/箭头/连接

json - 在Golang中将JSON文件转换为CSV

R: read.csv 将字母 i 导入为 NA

python - Python Pillow 不支持保存到 EPS?

python - 将运行计数分配给 3 个 pandas 一组的新列

python - 查找所有组值为 nan 的行

python - 计算每行的列平均值,不包括计算平均值的行

javascript - 根据条件仅将所需的数据行从 CSV 文件传递​​到 Jmeter