我需要一些帮助,以将一系列 JSON 文件从 S3 存储桶导入 PySpark DataFrame。
这个桶中的文件都有一个.json
扩展名,但遗憾的是不符合Spark通常的每行一个JSON对象的要求,相反,它们都在里面的一行上方括号。
所以代替:
{"first_column": 12, "second_column": {"nested_column": "value"}}
{"first_column": 24, "second_column": {"nested_column": "value2"}}
我有:
[{"first_column": 12, "second_column": {"nested_column": "value"}},{"first_column": 24, "second_column": {"nested_column": "value2"}}]
我们实际上收到了这种格式的文件,而且文件太多了,很遗憾,手动调整是不可行的。
目前我尝试过的方法如下:
我尝试使用 spark.read.json
方法,使用以下语法和通配符 *
一次加载多个文件。在这种情况下 spark
是 sqlContext
df = spark.read.json("s3://path_to_bucket_*.json")
这会在不引发任何错误或警告的情况下运行,并返回所需的架构:
df.printSchema()
root
|-- first_column: long (nullable = true)
|-- second_column: struct (nullable = true)
| |-- nested_column: string (nullable = true)
但是,当我尝试查看数据时,我得到以下信息:
+------------+-------------+
|first_column|second_column|
+------------+-------------+
| null| null|
+------------+-------------+
我找到了一种从 Databricks 实际加载数据的方法 here它使用 Spark 上下文 sc
读取 Paired RDD,如下所示:
dfRDD = sc.wholeTextFiles("s3://path_to_bucket_*.json")
这将返回一个带有文件名和文件主体的 PairedRDD。然而,真正让我感到困惑的是,当我使用来自该 RDD 的正文信息调用以下行时,它工作正常并且根本没有空值:
df = spark.read.json(dfRDD.map(lambda x: x[1]))
所以,我很困惑为什么会这样,因为我认为这是输入函数的相同信息,因为 RDD 中的文本正文不包含任何换行符,而是包含方括号内的 JSON 对象(就像我上面显示的第二个示例)。
虽然这是一种变通方法,但遗憾的是缺乏;首先,使用 RDD 方法要慢得多,更重要的是,我需要获取一列我从中获取此信息的文件的名称。我知道在直接从文件加载时使用 pyspark.sql.functions
模块中的 input_file_name
函数是可能的,但是在使用 RDD 方法时这不起作用.我已经设法编写了一个纯 Python 函数,它从每个 pairedRDD 的第一个元素中获取文件名信息到 JSON 字符串中,但这速度慢得令人痛苦。
如果有人能帮我解决这个问题,我将不胜感激。我明白我可能不得不使用 RDD 方法,但我很困惑为什么 spark.read.json
在一种情况下工作得很好,而在另一种情况下却不行。
最佳答案
虽然我不确定是什么原因导致一种解决方案有效而另一种无效,但我仅使用 sql.read.json 就能够在一定程度上解决问题。
将 read.json 中的参数 allowComments、allowUnquotedFieldNames、allowSingleQuotes、allowNumericLeadingZero、allowBackslashEscapingAnyCharacter 设置为 True。通过这种方式,我能够删除空值,并且 90% 的数据在没有任何空值的数据框中成功转换。
查看其他参数here
关于json - 将单行文件中的多个 JSON 对象加载到 PySpark 时为空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47639352/