json - 使用 Spark 和 scala 从 Spark 数据帧中的 JSON 类型列中获取所有值,而不考虑键

标签 json scala apache-spark

我试图使用 Spark 加载包含一些电影元数据的 TSV 文件,该 TSV 文件包含 JSON 格式的电影流派信息 [每行的最后一列]

示例文件

975900  /m/03vyhn   Ghosts of Mars  2001-08-24  14010832    98.0    {"/m/02h40lc": "English Language"}  {"/m/09c7w0": "United States of America"}   {"/m/01jfsb": "Thriller", "/m/06n90": "Science Fiction", "/m/03npn": "Horror", "/m/03k9fj": "Adventure", "/m/0fdjb": "Supernatural", "/m/02kdv5l": "Action", "/m/09zvmj": "Space western"}
3196793 /m/08yl5d   Getting Away with Murder: The JonBenét Ramsey Mystery   2000-02-16      95.0    {"/m/02h40lc": "English Language"}  {"/m/09c7w0": "United States of America"}   {"/m/02n4kr": "Mystery", "/m/03bxz7": "Biographical film", "/m/07s9rl0": "Drama", "/m/0hj3n01": "Crime Drama"}

我尝试了下面的代码,它使我能够从流派 JSON 访问特定值

val ss = SessionCreator.createSession("DataCleaning", "local[*]")//helper function creates a spark session and returns it
val headerInfoRb = ResourceBundle.getBundle("conf.headerInfo")
val movieDF = DataReader.readFromTsv(ss, "D:/Utility/Datasets/MovieSummaries/movie.metadata.tsv")
                .toDF(headerInfoRb.getString("metadataReader").split(',').toSeq:_*)//Datareader.readFromTsv is a helper function to read TSV file ,takes sparkSession and file path as input to resurn a dataframe, which uses sparkSession's read function 

movieDF.select("wiki_mv_id","mv_nm","mv_genre")
                .withColumn("genre_frmttd", get_json_object(col("mv_genre"), "$./m/02kdv5l"))
                .show(1,false)

输出

+----------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|wiki_mv_id|mv_nm         |mv_genre                                                                                                                                                                                  |genre_frmttd|
+----------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
|975900    |Ghosts of Mars|{"/m/01jfsb": "Thriller", "/m/06n90": "Science Fiction", "/m/03npn": "Horror", "/m/03k9fj": "Adventure", "/m/0fdjb": "Supernatural", "/m/02kdv5l": "Action", "/m/09zvmj": "Space western"}|Action      |
+----------+--------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------+
only showing top 1 row

我希望数据框中的每一行都按照下面所示的方式rieve_frmttd列[下面的代码片段适用于第一个示例行]

[Thriller,Fiction,Horror,Adventure,Supernatural,Action,Space Western]

我是 scala 和 Spark 的新手,请建议一些列出值的方法

最佳答案

  1. 使用 from_json 解析 JSON
  2. 将其转换为MapType(StringType, StringType)
  3. 使用map_values仅提取值
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

movieDF.select("wiki_mv_id","mv_nm","mv_genre")
      .withColumn("genre_frmttd",map_values(from_json(col("mv_genre"),MapType(StringType, StringType))))
      .show(1,false)

关于json - 使用 Spark 和 scala 从 Spark 数据帧中的 JSON 类型列中获取所有值,而不考虑键,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62069074/

相关文章:

java - 使用Java和Jersey对以categoryID为键的类别进行JSON反序列化

java - 没有在 Spring Controller 方法中获取 JSON 值

jquery - 如果数组之一 = 我的值,则添加所选内容

scala - 在 Scala 规范中同步之前/之后的方法?

apache-spark - 使用 Dataframes 从 Informix 到 Spark 的 JDBC

json - 使用 AJAX 调用从 Symfony2 Controller 返回 JSONP

xml - 在测试中避免 Scala XML 的 <a>b {"c"}</a> != <a>b c</a> 行为

javascript - 在 liftweb 静态内容中嵌入 javascript for 循环的问题

apache-spark - 修改 ArrayType 中的所有元素

scala - 从分区的 Parquet 文件读取DataFrame