json - 将 JSON 文件读入 Spark 数据集并从单独的 Map 添加列

标签 json scala apache-spark apache-spark-sql apache-spark-dataset

Spark 2.1 和 Scala 2.11 在这里。我有一个大 Map[String,Date]其中有 10K 个键/值对。我还有 10K JSON 文件存在于 Spark 可访问的文件系统上:

mnt/
    some/
        path/
            data00001.json
            data00002.json
            data00003.json
            ...
            data10000.json

映射中的每个 KV 对对应于其各自的 JSON 文件(因此第一个映射 KV 对对应于 data00001.json 等)

我想将所有这些 JSON 文件读入 1 个大型 Spark Dataset并且,当我在做的时候,向这个数据集添加两个新列(JSON 文件中不存在)。每个映射键将是第一个新列的值,每个键的值将是第二个新列的值:
val objectSummaries = getScalaList()
val dataFiles = objectSummaries.filter { _.getKey.endsWith("data.json") }
val dataDirectories = dataFiles.map(dataFile => {
  val keyComponents = dataFile.getKey.split("/")
  val parent = if (keyComponents.length > 1) keyComponents(keyComponents.length - 2) else "/"
  (parent, dataFile.getLastModified)
})

// TODO: How to take each KV pair from dataDirectories above and store them as the values for the
// two new columns?
val allDataDataset = spark.read.json("mnt/some/path/*.json")
  .withColumn("new_col_1", dataDirectories._1)
  .withColumn("new_col_2", dataDirectories._2)

我已经确认,当我删除 mnt/some/path/*.json 时,Spark 将遵守通配符( withColumn )并将所有 JSON 文件读入单个数据集。方法并做一个 allData.show() .所以我在那里一切都很好。

我正在挣扎的是:如何添加两个新列,然后正确提取所有键/值映射元素?

最佳答案

如果我理解正确,您想将 map 中的 KV 与 json 文件中的数据帧相关联。

我将尝试将问题简化为仅排序的 3 个文件和 3 个键值。

val kvs = Map("a" -> 1, "b" -> 2, "c" -> 3)
val files = List("data0001.json", "data0002.json", "data0003.json")

定义一个 case 类来处理更简单的文件、键、值
case class FileWithKV(fileName: String, key: String, value: Int)

将压缩文件和 kvs
val filesWithKVs = files.zip(kvs)
  .map(p => FileWithKV(p._1, p._2._1, p._2._2))

它看起来像这样
filesWithKVs: List[FileWithKV] = List(FileWithKV(data0001.json,a,1), FileWithKV(data0002.json,b,2), FileWithKV(data0003.json,c,3))

然后我们从一个初始数据框开始,从我们集合的头部开始,然后将开始向左折叠以构建将保存所有文件的整个数据框,所有列都是从 KV 动态生成的
val head = filesWithKVs.head
val initialDf = spark
.read.json(head.filename)
.withColumn(s"new_col_1", lit(head.key)) 
.withColumn(s"new_col_2", lit(head.value))

现在折叠部分
val dfAll = filesWithKVs.tail.foldLeft(initialDf)((df, fileWithKV) => {
    val newDf = spark
    .read.json(fileWithKV.filename)
    .withColumn(s"new_col_1", lit(fileWithKV.key)) 
    .withColumn(s"new_col_2", lit(fileWithKV.value))
    // union the dataframes to capture file by file, key value with key value
    df.union(newDf)
})

数据框将如下所示,假设在 json 文件中将有一个名为 bar 的列和一个值 foo,对于 3 个 json 文件中的每一个
+---+----------+----------+
|bar|new_col_1 |new_col_2 |
+---+----------+----------+
|foo|         a|         1|
|foo|         b|         2|
|foo|         c|         3|
+---+----------+----------+

关于json - 将 JSON 文件读入 Spark 数据集并从单独的 Map 添加列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45445011/

相关文章:

python - 获取 Pyspark 中缺失评级的评级列表列,其中 0 已到位

Python Pandas 从复杂字典中创建记录

php - 在php中将三个mysql查询组合成一个json数组

javascript - 获取 JSON 中数组对象的名称

iphone - 如何在控制台 Xcode 中打印?

scala - 在 SBT 中过滤资源

arrays - Spark中iterable与Array的关系

apache-spark - 如何在 spark 提交期间指定输出日志文件

scala - 错误 : value is not a member of object using Scala on the shell

arrays - 在 Scala 中打印数组