我有一个文件,其中每一行都是字符串化的 JSON。 我想将其读入 Spark DataFrame 以及架构验证。
天真的方法是:
val schema: StructType = getSchemaFromSomewhere()
val df: DataFrame = spark.read
.option("mode", "DROPMALFORMED")
.format("json")
.schema(schema)
.load("path/to/data.json")
但是,这种方法仅执行一些非常基本的架构验证。
- 如果一行无法解析为 json - 它将被删除。
- 如果某行包含的属性的值无法转换为
架构
定义的类型 - 它将被删除。 - 但是 - 此加载方法会忽略不可为空的字段(使它们在生成的 DF 中可以为空),并且不允许填写默认值。
方法 2 - 使用 JsonSchema
为了做到这一点,我不能再使用 spark.read.json()
因为我需要数据采用 JsonNode
格式。
因此,我将其作为文本文件读取并使用 JsonSchema 库解析它:
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode): Boolean = {
jsonSchema.exists(jsonSchema => {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
})
}
lazy val jsonSchema: Option[JsonSchema] = stringToJsonSchema(schemaSource).toOption
val schema: StructType = getSchemaFromSomewhere()
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson)
.getOrElse(false)
})
.select(from_json($"value", schema) as "jsonized")
.select("jsonized.*")
现在的问题是,我将每个 string
行解析为 json 两次 - 一次在 filter
内,另一次在 select(from_json .. .)
.
我在寻找什么
某种方法可以将 JSON 数据从文件读取到 DataFrame,同时对所有数据应用 JsonSchema 验证 - 无效数据应该被删除(并且可能也记录在某处)。
- 有没有办法将
Dataset[JsonNode]
转换为DataFrame
而无需多次解析? - 有没有办法将 DF
Row
转换为JsonNode
对象?这样我就可以翻转顺序 - 首先使用spark.read.json()
读取 DF,然后通过将每个Row
转换为JsonNode
来过滤 DF code> 并应用JsonSchema
。 - 我还缺少其他一些方式吗?
谢谢
最佳答案
Is there a way to convert Dataset[JsonNode] to a DataFrame without parsing it more than once?
在大多数情况下,与作业的总 CPU 使用率相比,解析两次的开销可能可以忽略不计。
如果您的情况并非如此,您可以在 DataSourceV2
中实现您自己的 TableProvider
。如果解析要求可能随着时间的推移而改变或演变,这可能是一个不错的长期解决方案。
关于json - Spark : Use JsonSchema to parse Json into DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71254983/