我有一个 Spark 数据框,需要将其写入 MongoDB。我想知道如何在 mongoDB 中将数据帧的某些列编写为嵌套/分层 JSON。 假设数据框有 6 列,col1, col2,.....col5, col6 我希望 col1、col2、col3 作为第一层次结构,其余列 col4 到 col6 作为第二层次结构。像这样的东西,
{
"col1": 123,
"col2": "abc",
"col3": 45,
"fields": {
"col4": "ert",
"col5": 45,
"col6": 56
}
}
如何在 pyspark 中实现这一目标?
最佳答案
在本例的内置函数中使用to_json + struct
。
示例:
df.show()
#+----+----+----+----+----+----+
#|col1|col2|col3|col4|col5|col6|
#+----+----+----+----+----+----+
#| 123| abc| 45| ert| 45| 56|
#+----+----+----+----+----+----+
from pyspark.sql.functions import *
df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).show(10,False)
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|col1|col2|col3|col4|col5|col6|jsn |
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|123 |abc |45 |ert |45 |56 |{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
cols=df.columns
df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).drop(*cols).show(10,False)
#+---------------------------------------------------------------------------------------+
#|jsn |
#+---------------------------------------------------------------------------------------+
#|{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+---------------------------------------------------------------------------------------+
#using toJSON
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).toJSON().collect()
#[u'{"jsn":{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}}']
#to write as json file
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).\
drop(*cols).\
write.\
format("json").\
save("<path>")
<小时/>
更新:
jsn
列表示为 json struct
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).printSchema()
#root
# |-- jsn: struct (nullable = false)
# | |-- col1: string (nullable = true)
# | |-- col2: string (nullable = true)
# | |-- col3: string (nullable = true)
# | |-- fields: struct (nullable = false)
# | | |-- col4: string (nullable = true)
# | | |-- col5: string (nullable = true)
# | | |-- col6: string (nullable = true)
关于mongodb - 在 Spark 中创建分层 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61335029/