apache-spark - 使用 scala 从 Spark 中的数组数组中的结构中提取值

标签 apache-spark apache-spark-sql

我正在使用 scala 将 json 数据读入 Spark 数据帧。 架构如下:

 root
 |-- metadata: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- playerId: string (nullable = true)
 |    |    |-- sources: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- matchId: long (nullable = true)

数据如下:

{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 1 } ] }, { "playerId": "1235", "sources": [ { "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 2 } ] }, { "playerId": "1248", "sources": [ { "score": 12.2 , "matchId": 1 } ] } ] }
{ "metadata" : [ { "playerId" : "1234", "sources" : [ { "matchId": 3 } ] }, { "playerId": "1248", "sources": [ { "matchId": 3 } ] } ] }

目标是查找playerId是否为1234且matchId是否为1,然后返回isPlayed为true。来源的结构不是固定的。可能还有 matchId 以外的字段。

我编写了一个 udf,考虑元数据的类型为 WrappedArray[String],并且能够读取 playerId 列

def hasPlayer = udf((metadata: WrappedArray[String], playerId: String) => { 
  metadata.contains(playerId)
  })

df.withColumn("hasPlayer", hasPlayer(col("metadata"), col("superPlayerId")))

但是我无法弄清楚如何查询给定的playerId的matchId字段。我尝试将该字段读取为 WrappedArray[WrappedArray[Long]],但它在metadata.sources.matchId 列的 withColumn 中给出了类型转换异常。

我对 Spark 比较陌生。任何帮助将不胜感激。

干杯!

最佳答案

当你处理JSON时,了解一下内置函数explode ,这会变成包含 WrappedArray 的单个单元格分成代表内部结构的多行。我认为这有帮助(两次):

df.select(explode($"metadata").as("metadata"))
  .select($"metadata.playerId", explode($"metadata.sources.matchId").as("matchId"))
  .filter($"matchId".equalTo(1))
  .select($"matchId", lit(true).as("isPlayed"))

基本上我用explode要创建多行(并重命名为方便的名称),请将对象树导航到我想要的 JSON 字段,重复 explode/重命名 matchId 的进程,并过滤掉所有不是 1 的内容这让我终于可以使用 lit函数对 true 的值进行“硬编码”名为 isPlayed 的全新专栏因为所有不是1的东西消失了。

如果这不是您正在寻找的内容,希望它能为您提供一些指导。 functions library当您了解 Spark 时,它会对您非常有帮助。

关于apache-spark - 使用 scala 从 Spark 中的数组数组中的结构中提取值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43424554/

相关文章:

apache-spark - 用 pyspark 用以前已知的好值填充 null

apache-spark - 在pyspark中,使用df.write.partitionBy(..).save时如何对某一列的部分值进行分区?

java - Spark 2.0.0 抛出 AlreadyExistsException(消息 :Database default already exists) when interact with Hive 1. 0.0

apache-spark - Spark 数据集过滤器性能

java - 使用 java 将索引列添加到 apache Spark Dataset<Row>

java - 如何解决此错误 "Type mismatch: cannot convert from List<String> to Iterator<String>"

python - 在 pyspark 中找不到 col 函数

apache-spark - 尝试将数据帧 Spark 保存到 HDFS 文件时出错

python - PySpark 中的列过滤

scala - 数据集过滤器: eta expansion is not done automatically