scala - 如何使用 spark scala 以附加模式在单个 JSON 文件下保存多个查询的输出

标签 scala apache-spark hadoop

我有 5 个查询,如下所示:

select * from table1  
select * from table2  
select * from table3  
select * from table4  
select * from table5  
现在,我想要的是我必须以顺序方式执行这些查询,然后继续将输出存储在 single JSON 中。 appended 中的文件模式。我编写了以下代码,但它将每个查询的输出存储在 different part files 中而不是一个。
下面是我的代码:
def store(jobEntity: JobDetails, jobRunId: Int): Unit = {
    UDFUtil.registerUdfFunctions()
    var outputTableName: String = null
    val jobQueryMap = jobEntity.jobQueryList.map(jobQuery => (jobQuery.sequenceId, jobQuery))
    val sortedQueries = scala.collection.immutable.TreeMap(jobQueryMap.toSeq: _*).toMap
    LOGGER.debug("sortedQueries ===>" + sortedQueries)
    try {
      outputTableName = jobEntity.destinationEntity
      var resultDF: DataFrame = null
      sortedQueries.values.foreach(jobQuery => {
        LOGGER.debug(s"jobQuery.query ===> ${jobQuery.query}")
        resultDF = SparkSession.builder.getOrCreate.sqlContext.sql(jobQuery.query)

        if (jobQuery.partitionColumn != null && !jobQuery.partitionColumn.trim.isEmpty) {
          resultDF = resultDF.repartition(jobQuery.partitionColumn.split(",").map(col): _*)
        }
        if (jobQuery.isKeepInMemory) {
          resultDF = resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
        }
        if (jobQuery.isCheckpointEnabled) {
          val checkpointDir = ApplicationConfig.getAppConfig(JobConstants.CHECKPOINT_DIR)
          val fs = FileSystem.get(new Storage(JsonUtil.toMap[String](jobEntity.sourceConnection)).asHadoopConfig())
          val path = new Path(checkpointDir)
          if (!fs.exists(path)) {
            fs.mkdirs(path)
          }
          resultDF.explain(true)
          SparkSession.builder.getOrCreate.sparkContext.setCheckpointDir(checkpointDir)
          resultDF = resultDF.checkpoint
        }
        resultDF = {
          if (jobQuery.isBroadCast) {
            import org.apache.spark.sql.functions.broadcast
            broadcast(resultDF)
          } else
            resultDF
        }
        tempViewsList.+=(jobQuery.queryAliasName)
        resultDF.createOrReplaceTempView(jobQuery.queryAliasName)
        //      resultDF.explain(true)
        val map: Map[String, String] = JsonUtil.toMap[String](jobEntity.sinkConnection)
        LOGGER.debug("sink details :: " + map)
        if (resultDF != null && !resultDF.take(1).isEmpty) {
          resultDF.show(false)
          val sinkDetails = new Storage(JsonUtil.toMap[String](jobEntity.sinkConnection))
          val path = sinkDetails.basePath + File.separator + jobEntity.destinationEntity
          println("path::: " + path)
          resultDF.repartition(1).write.mode(SaveMode.Append).json(path)
        }
      }
      )
忽略我在这种方法中以及阅读和写作中所做的其他事情(CheckpointingLoggingAuditing)。

最佳答案

使用以下示例作为您的问题的引用。
我有三个表 Json数据( 具有不同的架构 )如下:

  • table1 --> 个人资料表
  • table2 --> 公司资料表
  • table3 --> 薪资数据表

  • 我正在根据您的要求以顺序模式一一阅读这三个表,并在 List TableColList 的帮助下对数据进行少量转换(​​爆炸 Json 数组列)其中包含与表对应的数组列名称,使用分号 (":") 分隔符。OutDFList是所有转换后的 DataFrame 的列表。
    最后,我从 OutDFList 中减少所有数据帧到单个数据帧中并将其写入一个 JSON文件。

    Note: I have used join to reduced all DataFrames, You can also use union(if have same columns) or else as per requirement.


    检查以下代码:
    scala> spark.sql("select * from table1").printSchema
    root
     |-- Personal: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- DOB: string (nullable = true)
     |    |    |-- EmpID: string (nullable = true)
     |    |    |-- Name: string (nullable = true)
    
    
    scala> spark.sql("select * from table2").printSchema
    root
     |-- Company: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- EmpID: string (nullable = true)
     |    |    |-- JoinDate: string (nullable = true)
     |    |    |-- Project: string (nullable = true)
    
    
    scala> spark.sql("select * from table3").printSchema
    root
     |-- Salary: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- EmpID: string (nullable = true)
     |    |    |-- Monthly: string (nullable = true)
     |    |    |-- Yearly: string (nullable = true)
    
    scala> val TableColList = List("table1:Personal", "table2:Company", "table3:Salary")
    TableColList: List[String] = List(table1:Personal, table2:Company, table3:Salary)
    
    
    scala>  val OutDFList = TableColList.map{ X =>
         |  val table = X.split(":")(0)
         |  val arrayColumn = X.split(":")(1)
         |  val df = spark.sql(s"""SELECT * FROM """ + table).select(explode(col(arrayColumn)) as "data").select("data.*")
         | df}
    OutDFList: List[org.apache.spark.sql.DataFrame] = List([DOB: string, EmpID: string ... 1 more field], [EmpID: string, JoinDate: string ... 1 more field], [EmpID: string, Monthly: string ... 1 more field])
    
    scala> val FinalOutDF  = OutDFList.reduce((df1, df2) => df1.join(df2, "EmpID"))
    FinalOutDF: org.apache.spark.sql.DataFrame = [EmpID: string, DOB: string ... 5 more fields]
    
    scala> FinalOutDF.printSchema
    root
     |-- EmpID: string (nullable = true)
     |-- DOB: string (nullable = true)
     |-- Name: string (nullable = true)
     |-- JoinDate: string (nullable = true)
     |-- Project: string (nullable = true)
     |-- Monthly: string (nullable = true)
     |-- Yearly: string (nullable = true)
    
    
    scala> FinalOutDF.write.json("/FinalJsonOut")
    

    关于scala - 如何使用 spark scala 以附加模式在单个 JSON 文件下保存多个查询的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62638534/

    相关文章:

    python - python子进程检查输出:如何获取整个消息

    sql - 使用 AWK 过滤掉每一行的不同字段

    join - 我们可以使用 Hadoop MapReduce 代码合并两个文件吗

    database - 用户事件数据的理想数据存储?

    scala - 如何使用 Play 和 sbt 在 CI 中使用单独的配置文件进行测试?

    scala - 在Scala中使用self别名=>

    r - 在 Windows 上为 sparklyr 安装 Spark

    pio 训练阶段的 java.lang.StackOverflowError

    python - 从 Windows 启动 spark ec2 集群

    scala - 如何在 scala 中启用后缀运算符?