mongodb - 在 Spark 中创建分层 JSON

标签 mongodb apache-spark pyspark

我有一个 Spark 数据框,需要将其写入 MongoDB。我想知道如何在 mongoDB 中将数据帧的某些列编写为嵌套/分层 JSON。 假设数据框有 6 列,col1, col2,.....col5, col6 我希望 col1、col2、col3 作为第一层次结构,其余列 col4 到 col6 作为第二层次结构。像这样的东西,

{
    "col1": 123,
    "col2": "abc",
    "col3": 45,
    "fields": {
        "col4": "ert",
        "col5": 45,
        "col6": 56
    }
}

如何在 pyspark 中实现这一目标?

最佳答案

在本例的内置函数中使用to_json + struct

示例:

df.show()                                                                                                         
#+----+----+----+----+----+----+
#|col1|col2|col3|col4|col5|col6|
#+----+----+----+----+----+----+
#| 123| abc|  45| ert|  45|  56|
#+----+----+----+----+----+----+

from pyspark.sql.functions import *
df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).show(10,False)
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|col1|col2|col3|col4|col5|col6|jsn                                                                                    |
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+
#|123 |abc |45  |ert |45  |56  |{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+----+----+----+----+----+----+---------------------------------------------------------------------------------------+

cols=df.columns

df.withColumn("jsn",to_json(struct("col1","col2","col3",struct("col4","col5","col6").alias("fields")))).drop(*cols).show(10,False)
#+---------------------------------------------------------------------------------------+
#|jsn                                                                                    |
#+---------------------------------------------------------------------------------------+
#|{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}|
#+---------------------------------------------------------------------------------------+

#using toJSON
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).toJSON().collect()
#[u'{"jsn":{"col1":"123","col2":"abc","col3":"45","fields":{"col4":"ert","col5":"45","col6":"56"}}}']

#to write as json file
df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).\
drop(*cols).\
write.\
format("json").\
save("<path>")
<小时/>

更新:

jsn 列表示为 json struct

df.withColumn("jsn",struct("col1","col2","col3",struct("col4","col5","col6").alias("fields"))).drop(*cols).printSchema()
#root
# |-- jsn: struct (nullable = false)
# |    |-- col1: string (nullable = true)
# |    |-- col2: string (nullable = true)
# |    |-- col3: string (nullable = true)
# |    |-- fields: struct (nullable = false)
# |    |    |-- col4: string (nullable = true)
# |    |    |-- col5: string (nullable = true)
# |    |    |-- col6: string (nullable = true)

关于mongodb - 在 Spark 中创建分层 JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61335029/

相关文章:

apache-spark - 在 spark 中,参数 "minPartitions"在 SparkContext.textFile(path, minPartitions) 中有什么作用?

apache-spark - PySpark to_utc_timestamp 返回相同时间

mongodb - 通过 Mongo DB .NET 驱动程序以编程方式为 Azure Cosmos DB 创建数据库和集合

node.js - MongooseJS - 如何找到具有最大值的元素?

MongoDB in Go (golang) with mgo : How do I update a record, 找出更新是否成功并在单个原子操作中获取数据?

scala - 从 Spark-Scala UDF 返回 Seq[Row]

apache-spark - Spark-shell 启动失败

python - 将列中的 String 转换为 ArrayType 并分解

pyspark - 如何通过pyspark将csv文件写入一个文件

javascript - 对已发布集合的 meteor react 过滤