scala - 将外部json文件读入RDD并提取scala中的特定值

标签 scala apache-spark

首先,我对 scala 和 Spark 完全陌生,尽管对 pyspark 有点熟悉。我正在使用外部 json 文件,该文件非常大,并且不允许我将其转换为数据集或数据帧。我必须对纯 RDD 执行操作。

所以我想知道如何获得键的具体值。所以我将我的 json 文件读取为 sc.textFile("information.json") 现在通常在 python 中我会这样做

x = sc.textFile("information.json").map(lambda x: json.loads(x))\ 
 .map(lambda x: (x['name'],x['roll_no'])).collect()

RDD中的scala(提取特定键的值)中是否有与上述代码等效的代码,而无需转换为数据帧或数据集。

本质上与 Equivalent pyspark's json.loads function for spark-shell 相同的问题但希望得到更具体和菜鸟友好的答案。谢谢

Json数据: {"name":"ABC", "roll_no":"12", "Major":"CS"}

最佳答案

选项1:RDD API + json4s lib

一种方法是使用 json4s图书馆。该库已在 Spark 内部使用。

import org.json4s._
import org.json4s.jackson.JsonMethods._

// {"name":"ABC1", "roll_no":"12", "Major":"CS1"}
// {"name":"ABC2", "roll_no":"13", "Major":"CS2"}
// {"name":"ABC3", "roll_no":"14", "Major":"CS3"}
val file_location = "information.json"

val rdd = sc.textFile(file_location)

rdd.map{ row =>
  val json_row = parse(row)

  (compact(json_row \ "name"), compact(json_row \ "roll_no"))
}.collect().foreach{println _}

// Output
// ("ABC1","12")
// ("ABC2","13")
// ("ABC3","14")

首先我们将行数据解析为 json_row,然后使用运算符 \ 访问该行的属性,即:json_row\"name"。最终结果是name,roll_no

的元组序列

选项 2:dataframe API + get_json_object()

更直接的方法是通过数据帧 API 结合 get_json_object() 函数。

import org.apache.spark.sql.functions.get_json_object

val df = spark.read.text(file_location)

df.select(
  get_json_object($"value","$.name").as("name"),
  get_json_object($"value","$.roll_no").as("roll_no"))
.collect()
.foreach{println _}

// [ABC1,12]
// [ABC2,13]
// [ABC3,14]

关于scala - 将外部json文件读入RDD并提取scala中的特定值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58037893/

相关文章:

java - Spark 斯卡拉 : Convert DataFrame OR Dataset to single comma separated string

java - Spark 错误 : NoSuchMethodError: scala. Predef$.$conforms()Lscala/Predef$$less$colon$less

scala - 如何使用 Slick 3.0 编写可读的嵌套连接查询

exception - Spark java.lang.SecurityException : class "javax.servlet.FilterRegistration"' with sbt

azure - UPDATE、DROP COLUMN 和 EXCEPT 在 Spark SQL 中不起作用

java - Spark DataFrame 类的 union() 方法在哪里?

java - Scala Swing 插入符位置

scala - 如何最有效地将 Scala DataFrame 的 Row 转换为 case 类?

hadoop - 如何从 Spark MLlib FP Growth 模型中提取数据

scala - 在 Spark 线性回归中获取协方差矩阵