我有一个由一列组成的 pyspark 数据框,称为 json
,其中每一行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的 json。
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
我尝试使用 json.loads
映射每一行:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
但这会返回 TypeError: expected string or buffer
我怀疑部分问题是从 dataframe
转换为 rdd
时,架构信息丢失了,所以我也尝试过手动输入架构信息:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
但我得到相同的 TypeError
。
看看 this answer ,看起来用 flatMap
展平行在这里可能有用,但我也没有成功:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
我收到此错误:AttributeError: 'unicode' object has no attribute 'get'
。
最佳答案
对于Spark 2.1+,您可以使用from_json
这允许在数据框中保留其他非 json 列,如下所示:
from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))
您让 Spark 派生 json 字符串列的架构。那么df.json
列不再是StringType,而是正确解码的json结构,即嵌套StrucType
和df
的所有其他列> 保持原样。
可以通过如下方式访问json内容:
df.select(col('json.header').alias('header'))
关于python - Pyspark:解析一列 json 字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41107835/