python =3.6
Spark =2.4
我的示例 JSON 数据:
{"data":{"header":"someheader","body":{"name":"somename","value":"somevalue","books":[{"name":"somename"},{"value":"somevalue"},{"author":"someauthor"}]}}},
{"data":{"header":"someheader1","body":{"name":"somename1","value":"somevalue1","books":[{"name":"somename1"},{"value":"somevalue1"},{"author":"someauthor1"}]}}},....
我的结构模式:
Schema = StructType([StructField('header',StringType(),True),StructField('body',StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('books',ArrayType(StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('author',StringType(),True),StructField('publisher',StringType(),True)]),True),True)]),True)])
我想传递此架构并能够将所有字段(包括数据中缺少的字段)填充为 NULL。
因为,对于某一天的负载,可能会发生任何输入数据在结构字段的书籍数组中没有作者列。
因此,如果我不使用架构,spark 将无法推断该列,因为任何输入数据都没有它。
这是我尝试过的,
1> df = spark.read.schema(schema).json('/input/data/path')
这给了我所有的空行,因为输入文件在数据字段中有标题和正文,而结构模式中不存在数据字段
2> df = spark.read.json('/input/data/path').select(col("data.*")) df.coalesce(1).write.json('/输出/路径') df2 = spark.read.schema(schema).json('/输出/路径')
这也为我提供了所有空行,因为结构模式具有数据中不存在的额外列。
3> df = spark.read.json('/input/data/path').select(col("data.*")) df2 = spark.createDataFrame(df.rdd, schema)
这失败了,至于这个工作,列的顺序和所有嵌套列需要在数据和模式中完全相同,这是不可行的。
4> 在这种方法中,我尝试从输入中读取没有模式的数据并将其写回临时路径。
然后使用来自输入的模式再次读取数据,它为我提供所有空行,然后将空值替换为“1”,然后以追加模式写入相同的临时路径。
然后从这个临时路径再次读取并让 spark 推断模式。
但这也不起作用,因为嵌套的结构空列没有被非空值替换,当我写它时,输出路径没有所有列。
df.coalesce(1).write.json('/output/path')
df_input_with_schema = spark.read.schema(schema).json('/input/data/path') --all null rows
df_input_with_schema.na.fill('1').format('json').write.mode('append').save('/output/path')
df_final = spark.read.json('/output/path').filter(col("keycolumn") === 1)
有人可以帮忙吗?
最佳答案
从 spark 3 开始就有了 ignoreNullFields选项(默认为 True )
你可以这样做:
df.coalesce(1).write.mode('overwrite').json(ignoreNullFields=False,path="a")
保留只有空值的列
关于python - 在使用 pyspark 和预定义的结构模式读取嵌套的 JSON 时,如何将缺失的列添加为 null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63870745/