json - Spark : Use JsonSchema to parse Json into DataFrame

标签 json scala apache-spark jsonschema json-schema-validator

我有一个文件,其中每一行都是字符串化的 JSON。 我想将其读入 Spark DataFrame 以及架构验证。

天真的方法是:

val schema: StructType = getSchemaFromSomewhere()
val df: DataFrame = spark.read
  .option("mode", "DROPMALFORMED")
  .format("json")
  .schema(schema)
  .load("path/to/data.json")

但是,这种方法仅执行一些非常基本的架构验证。

  • 如果一行无法解析为 json - 它将被删除。
  • 如果某行包含的属性的值无法转换为架构定义的类型 - 它将被删除。
  • 但是 - 此加载方法会忽略不可为空的字段(使它们在生成的 DF 中可以为空),并且不允许填写默认值。

方法 2 - 使用 JsonSchema

为了做到这一点,我不能再使用 spark.read.json() 因为我需要数据采用 JsonNode 格式。 因此,我将其作为文本文件读取并使用 JsonSchema 库解析它:

def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault

def stringToJsonSchema(str: String): Try[JsonSchema] = {
  stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}

def stringToJson(str: String): Try[JsonNode] = {
  val mapper = new ObjectMapper
  Try({
    val json = mapper.readTree(str)
    json
  })
}

def validateJson(data: JsonNode): Boolean = {
  jsonSchema.exists(jsonSchema => {
    val report = jsonSchema.validateUnchecked(data, true)
    report.isSuccess
  })
}

lazy val jsonSchema: Option[JsonSchema] = stringToJsonSchema(schemaSource).toOption
val schema: StructType = getSchemaFromSomewhere()
val df = spark.read
  .textFile("path/to/data.json")
  .filter(str => {
    stringToJson(str)
      .map(validateJson)
      .getOrElse(false)
  })
  .select(from_json($"value", schema) as "jsonized")
  .select("jsonized.*")

现在的问题是,我将每个 string 行解析为 json 两次 - 一次在 filter 内,另一次在 select(from_json .. .).

我在寻找什么

某种方法可以将 JSON 数据从文件读取到 DataFrame,同时对所有数据应用 JsonSchema 验证 - 无效数据应该被删除(并且可能也记录在某处)。

  • 有没有办法将 Dataset[JsonNode] 转换为 DataFrame 而无需多次解析?
  • 有没有办法将 DF Row 转换为 JsonNode 对象?这样我就可以翻转顺序 - 首先使用 spark.read.json() 读取 DF,然后通过将每个 Row 转换为 JsonNode 来过滤 DF code> 并应用 JsonSchema
  • 我还缺少其他一些方式吗?

谢谢

最佳答案

Is there a way to convert Dataset[JsonNode] to a DataFrame without parsing it more than once?

在大多数情况下,与作业的总 CPU 使用率相比,解析两次的开销可能可以忽略不计。

如果您的情况并非如此,您可以在 DataSourceV2 中实现您自己的 TableProvider。如果解析要求可能随着时间的推移而改变或演变,这可能是一个不错的长期解决方案。

关于json - Spark : Use JsonSchema to parse Json into DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71254983/

相关文章:

scala - Spark Bucketizer - 即使没有元素也显示所有桶

scala - 在 Intellij 14.1.3 中运行 Spark 应用程序

python - 如何初始化先验未知数量的列表

scala - 将元素添加到作为映射值的 Scala 集

c# - ListBox SelectionChanged WP7 使用参数导航

swing - Scala中的惯用表格单元渲染器

Scala 扁平化列表

scala - 使用 SBT : Invalid or corrupt jarfile 构建 Apache Spark

javascript - 如何在vuejs中将所有excel数据转换为JSON

javascript - 如何从仅出现在 console.log() 而不是 JSON.stringify() 中的 JavaScript 对象中获取属性?