我正在尝试使用 Spark 处理具有可变结构(嵌套 JSON)的 JSON 数据。输入的 JSON 数据可能非常大,每行超过 1000 个键,一批可能超过 20 GB。
整个批次已从 30 个数据源生成,每个 JSON 的“key2”可用于识别源,并且预定义了每个源的结构。
处理此类数据的最佳方法是什么?
我曾尝试使用如下所示的 from_json,但它仅适用于固定模式,并且首先使用它我需要根据每个源对数据进行分组,然后应用该模式。
由于数据量很大,我的首选是仅扫描数据一次并根据预定义的模式从每个源中提取所需的值。
import org.apache.spark.sql.types._
import spark.implicits._
val data = sc.parallelize(
"""{"key1":"val1","key2":"source1","key3":{"key3_k1":"key3_v1"}}"""
:: Nil)
val df = data.toDF
val schema = (new StructType)
.add("key1", StringType)
.add("key2", StringType)
.add("key3", (new StructType)
.add("key3_k1", StringType))
df.select(from_json($"value",schema).as("json_str"))
.select($"json_str.key3.key3_k1").collect
res17: Array[org.apache.spark.sql.Row] = Array([xxx])
最佳答案
这只是对@Ramesh Maharjan 答案的重述,但使用了更现代的 Spark 语法。
我发现这种方法潜伏在 DataFrameReader
中它允许您从 Dataset[String]
解析 JSON 字符串变成任意 DataFrame
并利用 Spark 为您提供的相同模式推断 spark.read.json("filepath")
直接从 JSON 文件读取时。每行的架构可以完全不同。
def json(jsonDataset: Dataset[String]): DataFrame
用法示例:
val jsonStringDs = spark.createDataset[String](
Seq(
("""{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}"""),
("""{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"}""")))
jsonStringDs.show
jsonStringDs:org.apache.spark.sql.Dataset[String] = [value: string]
+----------------------------------------------------------------------------------------------------------------------+
|value
|
+----------------------------------------------------------------------------------------------------------------------+
|{"firstname": "Sherlock", "lastname": "Holmes", "address": {"streetNumber": 121, "street": "Baker", "city": "London"}}|
|{"name": "Amazon", "employeeCount": 500000, "marketCap": 817117000000, "revenue": 177900000000, "CEO": "Jeff Bezos"} |
+----------------------------------------------------------------------------------------------------------------------+
val df = spark.read.json(jsonStringDs)
df.show(false)
df:org.apache.spark.sql.DataFrame = [CEO: string, address: struct ... 6 more fields]
+----------+------------------+-------------+---------+--------+------------+------+------------+
|CEO |address |employeeCount|firstname|lastname|marketCap |name |revenue |
+----------+------------------+-------------+---------+--------+------------+------+------------+
|null |[London,Baker,121]|null |Sherlock |Holmes |null |null |null |
|Jeff Bezos|null |500000 |null |null |817117000000|Amazon|177900000000|
+----------+------------------+-------------+---------+--------+------------+------+------------+
该方法可从 Spark 2.2.0 获得:
http://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@json(jsonDataset:org.apache.spark.sql.Dataset[String]):org.apache.spark.sql.DataFrame
关于json - 使用动态模式 Spark from_json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49088401/