json - Spark 数据帧 : reading json having duplicate column names but different datatypes

标签 json apache-spark apache-spark-sql jackson jsonschema

我有如下所示的 json 数据,其中版本字段是区分符 -

file_1 = {"version": 1, "stats": {"hits":20}}

file_2 = {"version": 2, "stats": [{"hour":1,"hits":10},{"hour":2,"hits":12}]}

在新格式中,stats 列现在是 Arraytype(StructType)

之前只需要 file_1 所以我用的是

spark.read.schema(schema_def_v1).json(path)

现在我需要读取这些类型的多个 json 文件。我不能在 schema_def 中将 stats 定义为字符串,因为这会影响 corruptrecord 功能(对于 stats 列),该功能检查所有字段的格式错误的 json 和模式合规性。

1 只读中所需的示例 df 输出 -

version | hour | hits
1       | null | 20
2       | 1    | 10
2       | 2    | 12

我尝试使用 mergeSchema 选项进行读取,但这使得统计字段成为字符串类型。

此外,我尝试通过过滤版本字段并应用 spark.read.schema(schema_def_v1).json(df_v1.toJSON) 来制作两个数据帧。这里的 stats 列也变成了 String 类型。

我在想,如果在阅读时,我可以根据数据类型将 df 列标题解析为 stats_v1stats_v2 可以解决问题。请帮助解决任何可能的问题。

最佳答案

UDF 用于检查字符串或数组,如果是字符串则将字符串转换为数组。

import org.apache.spark.sql.functions.udf
import org.json4s.{DefaultFormats, JObject}
import org.json4s.jackson.JsonMethods.parse
import org.json4s.jackson.Serialization.write
import scala.util.{Failure, Success, Try}

object Parse {
    implicit val formats = DefaultFormats
    def toArray(data:String) = {
      val json_data = (parse(data))
      if(json_data.isInstanceOf[JObject]) write(List(json_data)) else data
    }
}

val toJsonArray = udf(Parse.toArray _)

scala> "ls -ltr /tmp/data".!
total 16
-rw-r--r--  1 srinivas  root  37 Jun 26 17:49 file_1.json
-rw-r--r--  1 srinivas  root  69 Jun 26 17:49 file_2.json
res4: Int = 0

scala> val df = spark.read.json("/tmp/data").select("stats","version")
df: org.apache.spark.sql.DataFrame = [stats: string, version: bigint]

scala> df.printSchema
root
 |-- stats: string (nullable = true)
 |-- version: long (nullable = true)

scala> df.show(false)
+-------+-------------------------------------------+
|version|stats                                      |
+-------+-------------------------------------------+
|1      |{"hits":20}                                |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|
+-------+-------------------------------------------+

输出

scala> 

import org.apache.spark.sql.types._
val schema = ArrayType(MapType(StringType,IntegerType))

df
.withColumn("json_stats",explode(from_json(toJsonArray($"stats"),schema)))
.select(
    $"version",
    $"stats",
    $"json_stats".getItem("hour").as("hour"),
    $"json_stats".getItem("hits").as("hits")
).show(false)

+-------+-------------------------------------------+----+----+
|version|stats                                      |hour|hits|
+-------+-------------------------------------------+----+----+
|1      |{"hits":20}                                |null|20  |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|1   |10  |
|2      |[{"hour":1,"hits":10},{"hour":2,"hits":12}]|2   |12  |
+-------+-------------------------------------------+----+----+

没有 UDF

scala> val schema = ArrayType(MapType(StringType,IntegerType))

scala> val expr = when(!$"stats".contains("[{"),concat(lit("["),$"stats",lit("]"))).otherwise($"stats")

df
.withColumn("stats",expr)
.withColumn("stats",explode(from_json($"stats",schema)))
.select(
    $"version",
    $"stats",
    $"stats".getItem("hour").as("hour"),
    $"stats".getItem("hits").as("hits")
)
.show(false)

+-------+-----------------------+----+----+
|version|stats                  |hour|hits|
+-------+-----------------------+----+----+
|1      |[hits -> 20]           |null|20  |
|2      |[hour -> 1, hits -> 10]|1   |10  |
|2      |[hour -> 2, hits -> 12]|2   |12  |
+-------+-----------------------+----+----+

关于json - Spark 数据帧 : reading json having duplicate column names but different datatypes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62559096/

相关文章:

java - HTTP 补丁 - RFC 6902 - JAVA Json 库

python - 使用自定义变量消息通过 AWS Lambda/Python 以 JSON 形式发送 Apple 推送通知

amazon-ec2 - Spark 1.3.1 : cannot read file from S3 bucket, org/jets3t/service/ServiceException

scala - 如何将 Spark 的累加器传递给函数?

apache-spark - 如何在组中找到第一个非空值? (使用数据集api进行二次排序)

java - 转换结果 java.lang.NullPointerException 时出错 & 解析数据 org.json.JSONException 时出错 : End of input at character 0 of

java - JSONObject 到 Java 表示

windows - 在 Windows 上使用 Staging S3A Committer 写入 S3 时出现 UnsatisfiedLinkError

scala - 如何在Spark sql中访问HIVE ACID表?

pyspark - pyspark 中 rdd 的映射如何工作?