python - 在pyspark sql中转换多个结构列数组

标签 python struct pyspark

我有 pyspark 数据框,其中包含多列(大约 30)嵌套结构,我想将其写入 csv。 (结构体

为了做到这一点,我想对所有结构列进行字符串化。

我在这里检查了几个答案:

Pyspark converting an array of struct into string

PySpark: DataFrame - Convert Struct to Array

PySpark convert struct field inside array to string

这是我的数据框的结构(大约有 30 个复杂的键):

root  
 |-- 1_simple_key: string (nullable = true)  
 |-- 2_simple_key: string (nullable = true)  
 |-- 3_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 4_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  
 |-- 5_complex_key: struct (nullable = true)  
 |    |-- n1: string (nullable = true)  
 |    |-- n2: struct (nullable = true)  
 |    |    |-- n3: boolean (nullable = true)  
 |    |    |-- n4: boolean (nullable = true)  
 |    |    |-- n5: boolean (nullable = true)  
 |    |-- n6: long (nullable = true)  
 |    |-- n7: long (nullable = true)  

建议的解决方案适用于单列,我无法将其应用于多列。

我想做这种类型的事情:
1. 对于每个 struct_column:
2. col = stringify(struct_column)

我不介意为其创建一个额外的数据框。我只需要准备好 csv 写入即可。

最小可重现示例:

from pyspark.sql import Row
d = d = {'1_complex_key': {0: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=954, y=238), 1: Row(type='1_complex_key', s=Row(n1=False, n2=False, n3=True), x=956, y=250), 2: Row(type='1_complex_key', s=Row(n1=True, n2=False, n3=False), x=886, y=269)}, '2_complex_key': {0: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=901, y=235), 1: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=905, y=249), 2: Row(type='2_complex_key', s=Row(n1=False, n2=False, n3=True), x=868, y=270)}, '3_complex_key': {0: Row(type='3_complex_key', s=Row(n1=True, n2=False, n3=False), x=925, y=197), 1: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=928, y=206), 2: Row(type='3_complex_key', s=Row(n1=False, n2=False, n3=True), x=883, y=236)}}
df = pd.DataFrame.from_dict(d)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
s_df = spark.createDataFrame(df)
s_df.printSchema()
s_df.write.csv('it_doesnt_write.csv')

enter image description here enter image description here

所以 - 总结一下: 我有一个 Spark 数据框,我想将其写入 CSV。 我无法将其写入 CSV,因为:

'CSV data source does not support struct<s:struct<n1:boolean,n2:boolean,n3:boolean>,type:string,x:bigint,y:bigint> data type.;'

所以我想对此数据帧执行一些操作/可逆转换,以便我可以将其写入 CSV,然后从 CSV 中读取它并使其成为具有相同架构的 Spark 数据帧。

我该怎么做?谢谢

最佳答案

pault评论中已经提到,您需要列表理解。这种列表理解需要一个列列表和一个将该列转换为字符串的函数。我将使用 df.columns 和 to_json但您也可以提供自己的 Python 列名称列表和自定义函数来 stringfy 您的复杂列。

#this converts all columns to json strings
#and writes it as to disk
s_df.select([F.to_json(x) for x in s_df.columns]).coalesce(1).write.csv('/tmp/testcsv')

如果您不想将 to_json 应用于所有列,您可以简单地修改它:

list4tojson = ['2_complex_key', '3_complex_key']
s_df.select('1_complex_key', *[F.to_json(x) for x in list4tojson]).coalesce(1).write.csv('/tmp/testcsv')

您可以使用 from_json 恢复数据帧:

df = spark.read.csv('/tmp/testcsv')
df.printSchema()
#root
# |-- _c0: string (nullable = true)
# |-- _c1: string (nullable = true)
# |-- _c2: string (nullable = true)

#interfering the schema
json_schema = spark.read.json(df.rdd.map(lambda row: row._c0)).schema

df.select([F.from_json(x, json_schema) for x in df.columns] ).printSchema()
#root
# |-- jsontostructs(_c0): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c1): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)
# |-- jsontostructs(_c2): struct (nullable = true)
# |    |-- s: struct (nullable = true)
# |    |    |-- n1: boolean (nullable = true)
# |    |    |-- n2: boolean (nullable = true)
# |    |    |-- n3: boolean (nullable = true)
# |    |-- type: string (nullable = true)
# |    |-- x: long (nullable = true)
# |    |-- y: long (nullable = true)

如果您只想以可读格式存储数据,则可以通过直接将其写入 json 来避免上述所有代码:

s_df.coalesce(1).write.json('/tmp/testjson')

df = spark.read.json('/tmp/testjson')

关于python - 在pyspark sql中转换多个结构列数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58595189/

相关文章:

python - 如何使用opencv python检测图像中的云

python - ubuntu 的 fatal error : yara. h : file not found when installing yara,

python - 如何在 ansible.cfg 文件中添加多个库路径

c - 如何直接将结构体赋值给数组?

C - Malloc 和 memcpy(内存管理)

python - 在本地和 Yarn 模式下运行 PySpark 的参数化 pytest fixture

apache-spark - 使用 pySpark 对 RDD 中数组类型的值进行排序

python - 如何绘制seaborn中离散变量的分布图

以对象作为数据成员的 C# 结构

python - Spark : Why is my UDF not called in rdd map?