apache-spark - PySpark "explode"列中的字典

标签 apache-spark pyspark explode

我在 spark 数据框中有一列“true_recoms”:

-RECORD 17----------------------------------------------------------------- 
item        | 20380109                                                                                                                                                                  
true_recoms | {"5556867":1,"5801144":5,"7397596":21}          

我需要“explode ”这个列来得到这样的东西:
item        | 20380109                                                                                                                                                                  
recom_item  | 5556867
recom_cnt   | 1
..............
item        | 20380109                                                                                                                                                                  
recom_item  | 5801144
recom_cnt   | 5
..............
item        | 20380109                                                                                                                                                                  
recom_item  | 7397596
recom_cnt   | 21

我试过使用 from_json 但它不起作用:
    schema_json = StructType(fields=[
        StructField("item", StringType()),
        StructField("recoms", StringType())
    ])
    df.select(col("true_recoms"),from_json(col("true_recoms"), schema_json)).show(5)

+--------+--------------------+------+
|    item|         true_recoms|true_r|
+--------+--------------------+------+
|31746548|{"32731749":3,"31...|   [,]|
|17359322|{"17359392":1,"17...|   [,]|
|31480894|{"31480598":1,"31...|   [,]|
| 7265665|{"7265891":1,"503...|   [,]|
|31350949|{"32218698":1,"31...|   [,]|
+--------+--------------------+------+
only showing top 5 rows

最佳答案

架构定义不正确。您声明为 struct有两个字符串字段

  • item
  • recoms

  • 而文档中不存在任何字段。

    不幸的是from_json只能返回结构或结构数组,因此将其重新定义为

    MapType(StringType(), LongType())
    

    不是一个选择。

    我个人会使用 udf
    from pyspark.sql.functions import udf, explode
    import json
    
    @udf("map<string, bigint>")
    def parse(s):
        try:
            return json.loads(s)
        except json.JSONDecodeError:
            pass 
    

    可以这样应用

    df = spark.createDataFrame(
        [(31746548, """{"5556867":1,"5801144":5,"7397596":21}""")],
        ("item", "true_recoms")
    )
    
    df.select("item",  explode(parse("true_recoms")).alias("recom_item", "recom_cnt")).show()
    # +--------+----------+---------+
    # |    item|recom_item|recom_cnt|
    # +--------+----------+---------+
    # |31746548|   5801144|        5|
    # |31746548|   7397596|       21|
    # |31746548|   5556867|        1|
    # +--------+----------+---------+
    

    关于apache-spark - PySpark "explode"列中的字典,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50683894/

    相关文章:

    sql - spark sql 是否像区分大小写?

    python - Pyspark 导入 Column 模块以使用 gt 或 geq

    python - Pyspark Cassandra - 为类型为 "struct"的 Spark 数据帧列创建正确的用户定义类型 (UDT)

    pyspark - 如何在 spark 中将时间戳列拆分为日期和时间

    PHP 生成 IP 范围

    xml - Spark Xml 读取包括文件名

    python - 在 pyspark 中找不到 col 函数

    php - 在 PHP 中回显 $_SERVER ['DOCUMENT_ROOT' ] 的结束部分或最后一个文件夹

    scala - 使用 Spark Scala 为每个分区添加一个常量值

    arrays - 配置单元查询以映射3个数组列的位置