json - 如何在 Spark 2 Scala 中将 Row 转换为 json

标签 json scala apache-spark json4s

有没有一种简单的方法可以将给定的 Row 对象转换为 json?

发现这个关于将整个数据帧转换为 json 输出:
Spark Row to JSON

但我只想将一行转换为 json。
这是我正在尝试做的伪代码。

更准确地说,我正在读取 json 作为数据帧中的输入。
我正在生成一个主要基于列的新输出,但有一个 json 字段用于所有不适合列的信息。

我的问题是编写此函数的最简单方法是什么:convertRowToJson()

def convertRowToJson(row: Row): String = ???

def transformVenueTry(row: Row): Try[Venue] = {
  Try({
    val name = row.getString(row.fieldIndex("name"))
    val metadataRow = row.getStruct(row.fieldIndex("meta"))
    val score: Double = calcScore(row)
    val combinedRow: Row = metadataRow ++ ("score" -> score)
    val jsonString: String = convertRowToJson(combinedRow)
    Venue(name = name, json = jsonString)
  })
}

Psidom 的解决方案:
def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

仅当 Row 只有一层而不是嵌套 Row 时才有效。这是架构:
StructType(
    StructField(indicator,StringType,true),   
    StructField(range,
    StructType(
        StructField(currency_code,StringType,true),
        StructField(maxrate,LongType,true), 
        StructField(minrate,LongType,true)),true))

还尝试了 Artem 建议,但没有编译:
def row2DataFrame(row: Row, sqlContext: SQLContext): DataFrame = {
  val sparkContext = sqlContext.sparkContext
  import sparkContext._
  import sqlContext.implicits._
  import sqlContext._
  val rowRDD: RDD[Row] = sqlContext.sparkContext.makeRDD(row :: Nil)
  val dataFrame = rowRDD.toDF() //XXX does not compile
  dataFrame
}

最佳答案

您可以使用 getValuesMap将行对象转换为 Map,然后将其转换为 JSON:

import scala.util.parsing.json.JSONObject
import org.apache.spark.sql._

val df = Seq((1,2,3),(2,3,4)).toDF("A", "B", "C")    
val row = df.first()          // this is an example row object

def convertRowToJSON(row: Row): String = {
    val m = row.getValuesMap(row.schema.fieldNames)
    JSONObject(m).toString()
}

convertRowToJSON(row)
// res46: String = {"A" : 1, "B" : 2, "C" : 3}

关于json - 如何在 Spark 2 Scala 中将 Row 转换为 json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41601874/

相关文章:

scala - 方法参数中多个连续的粗箭头在 Scala 中意味着什么?

apache-spark - 不同端口上的 Spark Thrift 服务器

scala - 结构化流 - 无法使用 FileContext API 管理 AWS S3 上的元数据日志文件

JavaScript 根据值对对象数组进行排序(条件排序)

java - 忽略 BigQuery 中的错误 CSV 记录

javascript - 如何对 Javascript 对象进行排序,或将其转换为数组?

json - 通过circe修改json字段类型

scala - Zeppelin 集群模式不适用于 spark 1.2 Ambari、Hortonworks Cluster

java - Spark 2.0.0 : Read from Cassandra in Cluster mode

php - 从 PHP、JSON Highcharts 获取数据?