scala - Spark union 因嵌套 JSON 数据帧而失败

标签 scala apache-spark union apache-spark-sql

我有以下两个 JSON 文件:

{
    "name" : "Agent1",
    "age" : "32",
    "details" : [{
            "d1" : 1,
            "d2" : 2
        }
    ]
}

{
    "name" : "Agent2",
    "age" : "42",
    "details" : []
}

我用 Spark 读了它们:

val jsonDf1 = spark.read.json(pathToJson1)
val jsonDf2 = spark.read.json(pathToJson2)

使用以下模式创建两个数据框:

root
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)

root
|-- age: string (nullable = true)
|-- details: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- name: string (nullable = true)

当我尝试与这两个数据帧执行联合时,我收到此错误:

jsonDf1.union(jsonDf2)


org.apache.spark.sql.AnalysisException: unresolved operator 'Union;;
'Union
:- LogicalRDD [age#0, details#1, name#2]
+- LogicalRDD [age#7, details#8, name#9]

我该如何解决这个问题?有时,我会在 Spark 作业将加载的 JSON 文件中得到空数组,但它仍然必须统一它们,这应该不是问题,因为 Json 文件的架构是相同的。

最佳答案

如果您尝试合并 2 个数据帧,您将得到以下结果:

error:org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. ArrayType(StringType,true) <> ArrayType(StructType(StructField(d1,StringType,true), StructField(d2,StringType,true)),true) at the second column of the second table

Json文件同时到达

要解决这个问题,如果你可以同时读取JSON,我建议:

val jsonDf1 = spark.read.json("json1.json", "json2.json")

这将给出这个架构:

jsonDf1.printSchema
 |-- age: string (nullable = true)
 |-- details: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- d1: long (nullable = true)
 |    |    |-- d2: long (nullable = true)
 |-- name: string (nullable = true)

数据输出

jsonDf1.show(10,truncate = false)
+---+-------+------+
|age|details|name  |
+---+-------+------+
|32 |[[1,2]]|Agent1|
|42 |null   |Agent2|
+---+-------+------+

Json 文件到达的时间不同

如果您的 json 在不同时间到达,作为默认解决方案,我建议读取具有完整数组的模板 JSON 对象,这将使您的数据帧具有可能的空数组,对任何联合都有效。然后,在输出结果之前,您将使用过滤器删除这个假 JSON:

val df = spark.read.json("jsonWithMaybeAnEmptyArray.json", 
"TemplateFakeJsonWithAFullArray.json")

df.filter($"name" !== "FakeAgent").show(1)

请注意:已开放 Jira 卡以提高合并 SQL 数据类型的能力:https://issues.apache.org/jira/browse/SPARK-19536并且这种操作应该在下一个Spark版本中可以实现。

关于scala - Spark union 因嵌套 JSON 数据帧而失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42530431/

相关文章:

scala - Spark : How to run logistic regression using only some features from LabeledPoint?

maven-2 - 我如何找出 Apache Buildr/Maven 2 存储库名称

apache-spark - 忽略了 JSON 阅读器中的 Spark 采样选项?

iphone - 如何在objectiveC类中使用struct和union?

sql - LIMIT 1的替代方案,以提取日期范围以及前后的第一个条目?

SQL - 1 个父表,2 个子表 - 为子表中的每一行返回单行

scala - 在 Play2 SecureSocial 上,POST 在 SecureadAction 之后作为 GET 执行

scala - 使用 Scala 和 Dispatch 获取 HTTP 响应作为 Array[Byte]

java - Clojure:将 Spark 工厂方法与 Java 互操作结合使用

python - Spark 公平调度不起作用