python - 在使用 pyspark 和预定义的结构模式读取嵌套的 JSON 时,如何将缺失的列添加为 null

标签 python json apache-spark

python =3.6

Spark =2.4

我的示例 JSON 数据:

{"data":{"header":"someheader","body":{"name":"somename","value":"somevalue","books":[{"name":"somename"},{"value":"somevalue"},{"author":"someauthor"}]}}},
{"data":{"header":"someheader1","body":{"name":"somename1","value":"somevalue1","books":[{"name":"somename1"},{"value":"somevalue1"},{"author":"someauthor1"}]}}},....

我的结构模式:

Schema = StructType([StructField('header',StringType(),True),StructField('body',StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('books',ArrayType(StructType([StructField('name1',StringType(),True),StructField('value',StringType(),True),StructField('author',StringType(),True),StructField('publisher',StringType(),True)]),True),True)]),True)])

我想传递此架构并能够将所有字段(包括数据中缺少的字段)填充为 NULL。

因为,对于某一天的负载,可能会发生任何输入数据在结构字段的书籍数组中没有作者列。

因此,如果我不使用架构,spark 将无法推断该列,因为任何输入数据都没有它。

这是我尝试过的,

1> df = spark.read.schema(schema).json('/input/data/path')

这给了我所有的空行,因为输入文件在数据字段中有标题和正文,而结构模式中不存在数据字段

2> df = spark.read.json('/input/data/path').select(col("data.*")) df.coalesce(1).write.json('/输出/路径') df2 = spark.read.schema(schema).json('/输出/路径')

这也为我提供了所有空行,因为结构模式具有数据中不存在的额外列。

3> df = spark.read.json('/input/data/path').select(col("data.*")) df2 = spark.createDataFrame(df.rdd, schema)

这失败了,至于这个工作,列的顺序和所有嵌套列需要在数据和模式中完全相同,这是不可行的。

4> 在这种方法中,我尝试从输入中读取没有模式的数据并将其写回临时路径。

然后使用来自输入的模式再次读取数据,它为我提供所有空行,然后将空值替换为“1”,然后以追加模式写入相同的临时路径。

然后从这个临时路径再次读取并让 spark 推断模式。

但这也不起作用,因为嵌套的结构空列没有被非空值替换,当我写它时,输出路径没有所有列。

df.coalesce(1).write.json('/output/path')
df_input_with_schema = spark.read.schema(schema).json('/input/data/path') --all null rows
df_input_with_schema.na.fill('1').format('json').write.mode('append').save('/output/path')
df_final = spark.read.json('/output/path').filter(col("keycolumn") === 1)

有人可以帮忙吗?

最佳答案

从 spark 3 开始就有了 ignoreNullFields选项(默认为 True )

你可以这样做:

df.coalesce(1).write.mode('overwrite').json(ignoreNullFields=False,path="a")

保留只有空值的列

关于python - 在使用 pyspark 和预定义的结构模式读取嵌套的 JSON 时,如何将缺失的列添加为 null,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63870745/

相关文章:

python - 在 flask 中使用 HTTP 身份验证时的标准 401 响应

c# - 如何美化 JSON 以在 TextBox 中显示?

javascript - 将 json 对象从变量解析为类,其中 key = class

apache-spark - sc.parallelize 和 sc.textFile 有什么区别?

sql - Spark Dataframe 嵌套 Case When 语句

python - 如何在终端上运行 Python 脚本?

python - 与 Django 的惰性 psql 连接

python - Django-Tastypie : Omit one specific object to be serialized in ManyToMany

java - MapReduce到Spark

python - 在执行函数时进入 python 解释器