我有 5 个查询,如下所示:
select * from table1
select * from table2
select * from table3
select * from table4
select * from table5
现在,我想要的是我必须以顺序方式执行这些查询,然后继续将输出存储在 single JSON
中。 appended
中的文件模式。我编写了以下代码,但它将每个查询的输出存储在 different part files
中而不是一个。下面是我的代码:
def store(jobEntity: JobDetails, jobRunId: Int): Unit = {
UDFUtil.registerUdfFunctions()
var outputTableName: String = null
val jobQueryMap = jobEntity.jobQueryList.map(jobQuery => (jobQuery.sequenceId, jobQuery))
val sortedQueries = scala.collection.immutable.TreeMap(jobQueryMap.toSeq: _*).toMap
LOGGER.debug("sortedQueries ===>" + sortedQueries)
try {
outputTableName = jobEntity.destinationEntity
var resultDF: DataFrame = null
sortedQueries.values.foreach(jobQuery => {
LOGGER.debug(s"jobQuery.query ===> ${jobQuery.query}")
resultDF = SparkSession.builder.getOrCreate.sqlContext.sql(jobQuery.query)
if (jobQuery.partitionColumn != null && !jobQuery.partitionColumn.trim.isEmpty) {
resultDF = resultDF.repartition(jobQuery.partitionColumn.split(",").map(col): _*)
}
if (jobQuery.isKeepInMemory) {
resultDF = resultDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
}
if (jobQuery.isCheckpointEnabled) {
val checkpointDir = ApplicationConfig.getAppConfig(JobConstants.CHECKPOINT_DIR)
val fs = FileSystem.get(new Storage(JsonUtil.toMap[String](jobEntity.sourceConnection)).asHadoopConfig())
val path = new Path(checkpointDir)
if (!fs.exists(path)) {
fs.mkdirs(path)
}
resultDF.explain(true)
SparkSession.builder.getOrCreate.sparkContext.setCheckpointDir(checkpointDir)
resultDF = resultDF.checkpoint
}
resultDF = {
if (jobQuery.isBroadCast) {
import org.apache.spark.sql.functions.broadcast
broadcast(resultDF)
} else
resultDF
}
tempViewsList.+=(jobQuery.queryAliasName)
resultDF.createOrReplaceTempView(jobQuery.queryAliasName)
// resultDF.explain(true)
val map: Map[String, String] = JsonUtil.toMap[String](jobEntity.sinkConnection)
LOGGER.debug("sink details :: " + map)
if (resultDF != null && !resultDF.take(1).isEmpty) {
resultDF.show(false)
val sinkDetails = new Storage(JsonUtil.toMap[String](jobEntity.sinkConnection))
val path = sinkDetails.basePath + File.separator + jobEntity.destinationEntity
println("path::: " + path)
resultDF.repartition(1).write.mode(SaveMode.Append).json(path)
}
}
)
忽略我在这种方法中以及阅读和写作中所做的其他事情(Checkpointing
、Logging
、Auditing
)。
最佳答案
使用以下示例作为您的问题的引用。
我有三个表 Json
数据( 具有不同的架构 )如下:
table1
--> 个人资料表table2
--> 公司资料表table3
--> 薪资数据表我正在根据您的要求以顺序模式一一阅读这三个表,并在 List
TableColList
的帮助下对数据进行少量转换(爆炸 Json 数组列)其中包含与表对应的数组列名称,使用分号 (":") 分隔符。OutDFList
是所有转换后的 DataFrame 的列表。最后,我从
OutDFList
中减少所有数据帧到单个数据帧中并将其写入一个 JSON
文件。Note: I have used join to reduced all DataFrames, You can also use union(if have same columns) or else as per requirement.
检查以下代码:
scala> spark.sql("select * from table1").printSchema
root
|-- Personal: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- DOB: string (nullable = true)
| | |-- EmpID: string (nullable = true)
| | |-- Name: string (nullable = true)
scala> spark.sql("select * from table2").printSchema
root
|-- Company: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- EmpID: string (nullable = true)
| | |-- JoinDate: string (nullable = true)
| | |-- Project: string (nullable = true)
scala> spark.sql("select * from table3").printSchema
root
|-- Salary: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- EmpID: string (nullable = true)
| | |-- Monthly: string (nullable = true)
| | |-- Yearly: string (nullable = true)
scala> val TableColList = List("table1:Personal", "table2:Company", "table3:Salary")
TableColList: List[String] = List(table1:Personal, table2:Company, table3:Salary)
scala> val OutDFList = TableColList.map{ X =>
| val table = X.split(":")(0)
| val arrayColumn = X.split(":")(1)
| val df = spark.sql(s"""SELECT * FROM """ + table).select(explode(col(arrayColumn)) as "data").select("data.*")
| df}
OutDFList: List[org.apache.spark.sql.DataFrame] = List([DOB: string, EmpID: string ... 1 more field], [EmpID: string, JoinDate: string ... 1 more field], [EmpID: string, Monthly: string ... 1 more field])
scala> val FinalOutDF = OutDFList.reduce((df1, df2) => df1.join(df2, "EmpID"))
FinalOutDF: org.apache.spark.sql.DataFrame = [EmpID: string, DOB: string ... 5 more fields]
scala> FinalOutDF.printSchema
root
|-- EmpID: string (nullable = true)
|-- DOB: string (nullable = true)
|-- Name: string (nullable = true)
|-- JoinDate: string (nullable = true)
|-- Project: string (nullable = true)
|-- Monthly: string (nullable = true)
|-- Yearly: string (nullable = true)
scala> FinalOutDF.write.json("/FinalJsonOut")
关于scala - 如何使用 spark scala 以附加模式在单个 JSON 文件下保存多个查询的输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62638534/