scala - Spark JSON 文本字段到 RDD

标签 scala cassandra apache-spark rdd

我有一个 cassandra 表,其中包含一个名为 snapshot 的文本类型字段,其中包含 JSON 对象:

[identifier, timestamp, snapshot]

我知道为了能够使用 Spark 对该字段进行转换,我需要将该 RDD 的该字段转换为另一个 RDD 以对 JSON 模式进行转换。

那是对的吗?我应该如何处理?

编辑:现在我设法从单个文本字段创建一个 RDD:
val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val first = snapshots.first()
val firstJson = sqlContext.jsonRDD(sc.parallelize(Seq(first._3)))
firstJson.printSchema()

这向我展示了 JSON 模式。好的!

我如何继续告诉 Spark 该模式应该应用于表 Snapshots 的所有行,以从每一行获取该快照字段上的 RDD?

最佳答案

快到了,您只想将带有 json 的 RDD[String] 传递到jsonRDD方法

val conf = new SparkConf().setAppName("signal-aggregation")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val snapshots = sc.cassandraTable[(String, String, String)]("listener", "snapshots")
val jsons = snapshots.map(_._3) // Get Third Row Element Json(RDD[String]) 
val jsonSchemaRDD = sqlContext.jsonRDD(jsons) // Pass in RDD directly
jsonSchemaRDD.registerTempTable("testjson")
sqlContext.sql("SELECT * FROM testjson where .... ").collect 

一个简单的例子
val stringRDD = sc.parallelize(Seq(""" 
  { "isActive": false,
    "balance": "$1,431.73",
    "picture": "http://placehold.it/32x32",
    "age": 35,
    "eyeColor": "blue"
  }""",
   """{
    "isActive": true,
    "balance": "$2,515.60",
    "picture": "http://placehold.it/32x32",
    "age": 34,
    "eyeColor": "blue"
  }""", 
  """{
    "isActive": false,
    "balance": "$3,765.29",
    "picture": "http://placehold.it/32x32",
    "age": 26,
    "eyeColor": "blue"
  }""")
)
sqlContext.jsonRDD(stringRDD).registerTempTable("testjson")
csc.sql("SELECT age from testjson").collect
//res24: Array[org.apache.spark.sql.Row] = Array([35], [34], [26])

关于scala - Spark JSON 文本字段到 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30033875/

相关文章:

java - 无法修改我的index.scala.html 文件并且无法连接到本地主机 :9000 in Play framework

Scala 2.12 和 Travis.ci - 如何排除与 Java 6 的组合?

python - 属性错误: 'NoneType' object has no attribute '_jvm - PySpark UDF

apache-spark - 如何减少 EMR 中 Apache Spark 的日志?

java - 为什么运行带有 "./"前缀的 sbt?它不起作用

excel - 从 Excel/VBA 调用 Scala 函数

java - NoClassDefFoundError : io/netty/util/Timer

sql - cassandra:单行可变列数

java - Spring Boot - 当程序失去与数据库的连接时如何执行方法?

scala - 使用scala在Spark中转置没有聚合的DataFrame