json - 使用动态模式 Spark from_json

标签 json apache-spark apache-spark-sql

我正在尝试使用 Spark 处理具有可变结构(嵌套 JSON)的 JSON 数据。输入的 JSON 数据可能非常大,每行超过 1000 个键,一批可能超过 20 GB。
整个批次已从 30 个数据源生成,每个 JSON 的“key2”可用于识别源,并且预定义了每个源的结构。

处理此类数据的最佳方法是什么?
我曾尝试使用如下所示的 from_json,但它仅适用于固定模式,并且首先使用它我需要根据每个源对数据进行分组,然后应用该模式。
由于数据量很大,我的首选是仅扫描数据一次并根据预定义的模式从每个源中提取所需的值。

import org.apache.spark.sql.types._ 
import spark.implicits._

val data = sc.parallelize(
    """{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
    :: Nil)
val df = data.toDF


val schema = (new StructType)
    .add("key1", StringType)
    .add("key2", StringType)
    .add("key3", (new StructType)
    .add("key3_k1", StringType))


df.select(from_json($"value",schema).as("json_str"))
  .select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])

最佳答案

这只是对@Ramesh Maharjan 答案的重述,但使用了更现代的 Spark 语法。

我发现这种方法潜伏在 DataFrameReader 中它允许您从 Dataset[String] 解析 JSON 字符串变成任意 DataFrame并利用 Spark 为您提供的相同模式推断 spark.read.json("filepath")直接从 JSON 文件读取时。每行的架构可以完全不同。

def json(jsonDataset: Dataset[String]): DataFrame

用法示例:
val jsonStringDs = spark.createDataset[String](
  Seq(
      ("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
      ("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))

jsonStringDs.show

jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}  |
+----------------------------------------------------------------------------------------------------------------------+


val df = spark.read.json(jsonStringDs)
df.show(false)

df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO       |address           |employeeCount|firstname|lastname|marketCap   |name  |revenue     |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null      |[London,Baker,121]|null         |Sherlock |Holmes  |null        |null  |null        |
|Jeff Bezos|null              |500000       |null     |null    |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+

该方法可从 Spark 2.2.0 获得:
http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame

关于json - 使用动态模式 Spark from_json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49088401/

相关文章:

c# - 简洁轻量的API : REST+JSON in . NET

node.js - 找不到合适的模拟服务器来提供带有预定义 url 的 json 响应

apache-spark - group by 子句中的 sparkSQL Map 列

scala - 如何在 SPARK SQL 中使用 LEFT 和 RIGHT 关键字

json - 由于我总是得到空结果,如何使用 arango REST API 进行全文搜索?

javascript - HighStocks工具提示无法正常工作

mysql - Spark - Mysql 连接 - 不支持的主要版本 52.0 错误

apache-spark - pyspark - 将列字符串转换为标题和值

apache-spark - spark 连接数据帧并合并模式

scala - Spark scala 将 Unix 时间转换为时间戳失败