python - 使用 pyspark 将结构数组转换为列 - 不分解数组

标签 python apache-spark pyspark apache-spark-sql

我目前有一个带有 id 和一列的数据框,该列是结构数组:

 root
 |-- id: string (nullable = true)
 |-- lists: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: string (nullable = true)

这是一个包含数据的示例表:

 id | list1             | list2
 ------------------------------------------
 1  | [[a, av], [b, bv]]| [[e, ev], [f,fv]]
 2  | [[c, cv]]         | [[g,gv]]

如何将上面的数据框转换为下面的数据框?我需要“分解”数组并根据结构中的第一个值添加列。

 id | a   | b   | c   | d   | e  | f  | g  
 ----------------------------------------
 1  | av  | bv  | null| null| ev | fv | null
 2  | null| null| cv  | null|null|null|gv
<小时/>

创建数据框的 pyspark 代码如下:

d1 = spark.createDataFrame([("1", [("a","av"),("b","bv")], [("e", "ev"), ("f", "fv")]), \
                                    ("2", [("c", "cv")],  [("g", "gv")])], ["id","list1","list2"])

注意:我的 Spark 版本为 2.2.0,因此某些 sql 函数无法工作,例如 concat_map 等。

最佳答案

您可以使用高阶函数来完成此操作,而无需分解数组,例如:

d1.select('id',
          f.when(f.size(f.expr('''filter(list1,x->x._1='a')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='a'),value->value._2)'''))).alias('a'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='b')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='b'),value->value._2)'''))).alias('b'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='c')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='c'),value->value._2)'''))).alias('c'),\
          f.when(f.size(f.expr('''filter(list1,x->x._1='d')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list1,x->x._1='d'),value->value._2)'''))).alias('d'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='e')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='e'),value->value._2)'''))).alias('e'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='f')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='f'),value->value._2)'''))).alias('f'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='g')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='g'),value->value._2)'''))).alias('g'),\
          f.when(f.size(f.expr('''filter(list2,x->x._1='h')'''))>0,f.concat_ws(',',f.expr('''transform(filter(list2,x->x._1='h'),value->value._2)'''))).alias('h')\
          ).show()


+---+----+----+----+----+----+----+----+----+
| id|   a|   b|   c|   d|   e|   f|   g|   h|
+---+----+----+----+----+----+----+----+----+
|  1|  av|  bv|null|null|  ev|  fv|null|null|
|  2|null|null|  cv|null|null|null|  gv|null|
+---+----+----+----+----+----+----+----+----+

希望对你有帮助

关于python - 使用 pyspark 将结构数组转换为列 - 不分解数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62078544/

相关文章:

python - 新手 python - 在数组中替换

python - 如何查看生成到 django 模板变量中的异常?

hadoop - 为什么 Spark 以不同的方式解释这两个查询?

azure - Pyspark Azure Blob 存储 - 未找到类 org.apache.hadoop.fs.azure.NativeAzureFileSystem

python:仅当字符在此列表中时才保留字符

python - 如何在 Windows 上安装 xmlsec? (pip install xmlsec 失败)

azure - 如何在 ADF 中运行 Spark 作业?

python - 在 PySpark 中使用微秒时间戳

python - Spark(Python)中的 Kolmogorov Smirnov 测试不起作用?

apache-spark - 具有两个日期列的 Spark 时间序列查询