我需要知道从输入目录流式传输的输入文件的文件名。
下面是scala编程中的spark FileStreaming代码
object FileStreamExample {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.master("local").getOrCreate()
val input_dir = "src/main/resources/stream_input"
val ck = "src/main/resources/chkpoint_dir"
//create stream from folder
val fileStreamDf = sparkSession.readStream.csv(input_dir)
def fileNames() = fileStreamDf.inputFiles.foreach(println(_))
println("Streaming Started...\n")
//fileNames() //even here it is throwing the same exception
val query = fileStreamDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("checkpointLocation", ck)
.start()
fileNames();
query.awaitTermination()
}}
但是面对以下异常,在流式传输时
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]
最佳答案
您可以使用 input_file_name()
在 org.apache.spark.sql.functions._
中定义的函数获取将行导入到数据框中的文件名。
sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())
关于scala - Spark File Streaming 获取文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58362352/