json - 使用 Scala 将 DataSet 转换为 Spark Json 数组

标签 json scala apache-spark apache-spark-sql apache-spark-dataset

我是 Spark 新手,无法找出以下问题的解决方案。

我有一个 JSON 文件需要解析,然后创建几个指标并将数据写回 JSON 格式。

现在以下是我正在使用的代码

import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._

object quick2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder
      .appName("quick1")
      .master("local[*]")
      .getOrCreate()

    val rawData = spark.read.json("/home/umesh/Documents/Demo2/src/main/resources/sampleQuick.json")

    val mat1 = rawData.select(rawData("mal_name"),rawData("cust_id")).distinct().orderBy("cust_id").toJSON.cache()
    val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).toJSON.cache()

val write1 = mat1.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat1/")

val write = mat2.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat2/")
}
}

现在上面的代码正在编写正确的 json 格式。 然而,矩阵也可以包含重复的结果 示例:

md5   mal_name
1       a
1       b
2       c
3       d
3       e

因此,使用上面的代码,每个对象都以单行写入

像这样

{"file_md5":"1","mal_name":"a"}
{"file_md5":"1","mal_name":"b"}
{"file_md5":"2","mal_name":"c"}
{"file_md5":"3","mal_name":"d"}

等等。

但我想组合公共(public)键的数据:

所以输出应该是

{"file_md5":"1","mal_name":["a","b"]}

有人可以建议我在这里做什么吗?或者是否有其他更好的方法来解决这个问题。

谢谢!

最佳答案

  1. You can use collect_list or collect_set as per your need on mal_name column
  2. You can directly save DataFrame/DataSet directly as JSON file
import org.apache.spark.sql.functions.{alias, collect_list}
import spark.implicits._

rawData.groupBy($"file_md5")
  .agg(collect_set($"mal_name").alias("mal_name"))
  .write
  .format("json")
  .save("json/file/location/to/save")

关于json - 使用 Scala 将 DataSet 转换为 Spark Json 数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45279449/

相关文章:

java - 如何将 java.lang.String 的空白 JSON 字符串值反序列化为 null?

scala - 将 n 个元素的 RDD 转换为单个元素的 RDD

scala - Akka Actor 有多重?

hadoop - YARN 阈值错误

scala - 如何在 apache Spark 中展平数据框 |斯卡拉

javascript - 如何使用变量值将 List<object> 序列化为 JSON?

python - 在Python中将树结构转换为csv的最有效方法的研究

Javascript 数组转换为 JSON 对象而不是数组

scala - 为什么 immutable.Map 的值类型是协变的,而 mutable 的值类型不是

scala - 在 Spark JDBC 读取方法中使用谓词