python - Pyspark:解析一列 json 字符串

标签 python json apache-spark pyspark

我有一个由一列组成的 pyspark 数据框,称为 json,其中每一行都是 json 的 unicode 字符串。我想解析每一行并返回一个新的数据框,其中每一行都是解析后的 json。

# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])

我尝试使用 json.loads 映射每一行:

(df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
  .toDF()
).show()

但这会返回 TypeError: expected string or buffer

我怀疑部分问题是从 dataframe 转换为 rdd 时,架构信息丢失了,所以我也尝试过手动输入架构信息:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

但我得到相同的 TypeError

看看 this answer ,看起来用 flatMap 展平行在这里可能有用,但我也没有成功:

schema = StructType([StructField('json', StringType(), True)])
rdd = (df
  .select('json')
  .rdd
  .flatMap(lambda x: x)
  .flatMap(lambda x: json.loads(x))
  .map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()

我收到此错误:AttributeError: 'unicode' object has no attribute 'get'

最佳答案

对于Spark 2.1+,您可以使用from_json这允许在数据框中保留其他非 json 列,如下所示:

from pyspark.sql.functions import from_json, col
json_schema = spark.read.json(df.rdd.map(lambda row: row.json)).schema
df.withColumn('json', from_json(col('json'), json_schema))

您让 Spark 派生 json 字符串列的架构。那么df.json列不再是StringType,而是正确解码的json结构,即嵌套StrucTypedf的所有其他列> 保持原样。

可以通过如下方式访问json内容:

df.select(col('json.header').alias('header'))

关于python - Pyspark:解析一列 json 字符串,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41107835/

相关文章:

java - 如何仅使用 json-simple 从嵌套对象中的键获取值

java - 将 Json 的 Dataset 列解析为 Dataset<Row>

java - Spark中通过SWIFT从对象存储获取数据需要什么配置

python - 如何动态链接 Pyspark 中的条件?

python - SpaCy 3 变压器矢量标记对齐

python - 无法操作列表

C#如何解析没有键名的json数据?

php - array_udiff 返回不同的结果

python - 使用 ijson python 将 1.4 GB json 数据加载到 mysql

python - 计算 DataFrame 中各组的差异和均值