python - Spark : How to parse and transform json string from spark data frame rows

标签 python apache-spark pyspark apache-spark-sql

如何在 pyspark 中解析和转换 Spark 数据帧行中的 json 字符串?

我正在寻求如何解析的帮助:

  • json 字符串到 json 结构 output 1
  • 将 json 字符串转换为 a、b 和 id 列 output 2

背景:我通过 API json 字符串获取大量行( jstr1jstr2 、...),这些行保存到 Spark df 。我可以单独读取每一行的模式,但这不是解决方案,因为它非常慢,因为模式有大量行。每个jstr具有相同的架构,列/键 a 和 b 保持相同,只是 id并且列中的值发生变化。

编辑:使用 MapType 模式的 blackbishop 解决方案就像一个魅力 schema = "map<string, array<struct<a:int,b:int>>>"

问题延伸至: How to transform JSON string with multiple keys, from spark data frame rows in pyspark?

from pyspark.sql import Row
jstr1 = '{"id_1": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}'
jstr2 = '{"id_2": [{"a": 5, "b": 6}, {"a": 7, "b": 8}]}'
    
df = sqlContext.createDataFrame([Row(json=jstr1),Row(json=jstr2)])
    
schema = F.schema_of_json(df.select(F.col("json")).take(1)[0].json)
df2 = df.withColumn('json', F.from_json(F.col('json'), schema))
df2.show()

当前输出:

+--------------------+
|                json|
+--------------------+
|[[[1, 2], [3, 4]]]  |
|                  []|
+--------------------+

所需输出1:

+--------------------+-------+
|         json      |   id   |
+--------------------+-------+
|[[[1, 2], [3, 4]]] |   id_1 |
|[[[5, 6], [7, 8]]] |   id_2 |
+--------------------+-------+ 

所需输出2:

+---------+----------+-------+
|    a    |     b    |   id  |
+--------------------+-------+
|    1    |    2     |  id_1 |
|    3    |    4     |  id_1 |
|    5    |    6     |  id_2 |
|    7    |    8     |  id_2 |
+---------+----------+-------+
 

最佳答案

第二行的值为空,因为您仅使用第一行的架构,该架构与第二行不同。您可以将 JSON 解析为 MapType,其中键为字符串类型,值为结构数组类型:

schema = "map<string, array<struct<a:int,b:int>>>"

df = df.withColumn('json', F.from_json(F.col('json'), schema))

df.printSchema()
#root
# |-- json: map (nullable = true)
# |    |-- key: string
# |    |-- value: array (valueContainsNull = true)
# |    |    |-- element: struct (containsNull = true)
# |    |    |    |-- a: integer (nullable = true)
# |    |    |    |-- b: integer (nullable = true)

然后,通过一些简单的转换,您可以获得预期的输出:

  • id列代表映射中的键,您可以通过 map_keys 获得它功能
  • 结构 <a:int, b:int>表示您使用 map_values 获得的值功能
output1 = df.withColumn("id", F.map_keys("json").getItem(0)) \
            .withColumn("json", F.map_values("json").getItem(0))

output1.show(truncate=False)

# +----------------+----+
# |json            |id  |
# +----------------+----+
# |[[1, 2], [3, 4]]|id_1|
# |[[5, 6], [7, 8]]|id_2|
# +----------------+----+

output2 = output1.withColumn("attr", F.explode("json")) \
    .select("id", "attr.*")

output2.show(truncate=False)

# +----+---+---+
# |id  |a  |b  |
# +----+---+---+
# |id_1|1  |2  |
# |id_1|3  |4  |
# |id_2|5  |6  |
# |id_2|7  |8  |
# +----+---+---+

关于python - Spark : How to parse and transform json string from spark data frame rows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65956766/

相关文章:

python - python 诅咒中的进度条

python - 如何读取二进制文件?

python - 获取tbody中tr的内容

scala - Spark : Is "count" on Grouped Data a Transformation or an Action?

apache-spark - 从 Titan(在 HBase 上)读取大图到 Spark

apache-spark - 将 Spark 结构化流数据帧与静态数据帧连接

python - 如何在 pyspark 中创建数据框的副本?

python - 如何在结束之前停止 python 脚本

scala - Spark 斯卡拉 : Check if string isn't null or empty

dataframe - pyspark:计算连续的 1/0 的数量,并在条纹太短/太长时更改它们