我正在使用 scala 将 json 数据读入 Spark 数据帧。 架构如下:
root
|-- metadata: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- playerId: string (nullable = true)
| | |-- sources: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- matchId: long (nullable = true)
数据如下:
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] }
目标是查找playerId是否为1234且matchId是否为1,然后返回isPlayed为true。来源的结构不是固定的。可能还有 matchId 以外的字段。
我编写了一个 udf,考虑元数据的类型为 WrappedArray[String],并且能够读取 playerId 列
def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => {
metadata.contains(playerId)
})
df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId")))
但是我无法弄清楚如何查询给定的playerId的matchId字段。我尝试将该字段读取为 WrappedArray[WrappedArray[Long]],但它在metadata.sources.matchId 列的 withColumn 中给出了类型转换异常。
我对 Spark 比较陌生。任何帮助将不胜感激。
干杯!
最佳答案
当你处理JSON时,了解一下内置函数explode
,这会变成包含 WrappedArray
的单个单元格分成代表内部结构的多行。我认为这有帮助(两次):
df.select(explode($"metadata").as("metadata"))
.select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId"))
.filter($"matchId".equalTo(1))
.select($"matchId", lit(true).as("isPlayed"))
基本上我用explode
要创建多行(并重命名为方便的名称),请将对象树导航到我想要的 JSON 字段,重复 explode
/重命名 matchId
的进程,并过滤掉所有不是 1
的内容这让我终于可以使用 lit
函数对 true
的值进行“硬编码”名为 isPlayed
的全新专栏因为所有不是1
的东西消失了。
如果这不是您正在寻找的内容,希望它能为您提供一些指导。 functions
library当您了解 Spark 时,它会对您非常有帮助。
关于apache-spark - 使用 scala 从 Spark 中的数组数组中的结构中提取值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43424554/