python - pyspark:将字典数组转换为新列

标签 python apache-spark pyspark apache-spark-sql

我正在努力转换我的 pyspark 数据框,如下所示:

df = spark.createDataFrame([('0018aad4',[300, 450], ['{"v1": "blue"}', '{"v2": "red"}']), ('0018aad5',[300], ['{"v1": "blue"}'])],[ "id","Tlist", 'Tstring'])
df.show(2, False)

+--------+----------+-------------------------------+
|id      |Tlist     |Tstring                        |
+--------+----------+-------------------------------+
|0018aad4|[300, 450]|[{"v1": "blue"}, {"v2": "red"}]|
|0018aad5|[300]     |[{"v1": "blue"}]               |
+--------+----------+-------------------------------+

对此:

df_result = spark.createDataFrame([('0018aad4',[300, 450], 'blue', 'red'), ('0018aad5',[300], 'blue', None)],[ "id","Tlist", 'v1', 'v2'])
df_result.show(2, False)

+--------+----------+----+----+
|id      |Tlist     |v1  |v2  |
+--------+----------+----+----+
|0018aad4|[300, 450]|blue|red |
|0018aad5|[300]     |blue|null|
+--------+----------+----+----+

我尝试过旋转和其他一些事情,但没有得到上面的结果。

请注意,我在Tstring列中没有确切的字典数量

你知道我该怎么做吗?

最佳答案

使用transform函数,您可以将数组的每个元素转换为 map 类型。之后,您可以使用aggregate函数获取一张 map ,将其分解,然后旋转键以获得所需的输出:

from pyspark.sql import functions as F

df1 = df.withColumn(
    "Tstring",
    F.transform("Tstring", lambda x: F.from_json(x, "map<string,string>"))
).withColumn(
    "Tstring",
    F.aggregate(
        F.expr("slice(Tstring, 2, size(Tstring))"), 
        F.col("Tstring")[0], 
        lambda acc, x: F.map_concat(acc, x)
    )
).select(
    "id", "Tlist", F.explode("Tstring")
).groupby(
    "id", "Tlist"
).pivot("key").agg(F.first("value"))


df1.show()
#+--------+----------+----+----+
#|id      |Tlist     |v1  |v2  |
#+--------+----------+----+----+
#|0018aad4|[300, 450]|blue|red |
#|0018aad5|[300]     |blue|null|
#+--------+----------+----+----+

我使用的是 Spark 3.1+,因此 dataframe API 中提供了诸如 transform 之类的高阶函数,但您可以使用 Spark expr 执行相同的操作 < 3.1:

df1 = (df.withColumn("Tstring", F.expr("transform(Tstring, x-> from_json(x, 'map<string,string>'))"))
       .withColumn("Tstring", F.expr("aggregate(slice(Tstring, 2, size(Tstring)), Tstring[0], (acc, x) -> map_concat(acc, x))"))
       .select("id", "Tlist", F.explode("Tstring"))
       .groupby("id", "Tlist")
       .pivot("key")
       .agg(F.first("value"))
       )

关于python - pyspark:将字典数组转换为新列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72305063/

相关文章:

python - 如何在 Spark 中关闭 INFO 日志记录?

Python/Selenium 网络驱动程序。在页面上查找一个元素并打印/返回它的 xpath

java - 如何从 Spark DataSet 中删除记录

apache-spark - 聚合函数 Pyspark Dataframe 中的错误

java - 如何使用 spark 处理一系列 hbase 行?

python - PySpark 可以使用 numpy 数组吗?

python - ORA-01861 : literal does not match format string error on char variable

python - 将长字符串分成 513 个字符 block | Python 3.3

python - Virtualenv、无站点包、sys.path

apache-spark - Spark : Using null checking in a CASE WHEN expression to protect against type errors