scala - Spark File Streaming 获取文件名

标签 scala apache-spark spark-streaming filestream

我需要知道从输入目录流式传输的输入文件的文件名。

下面是scala编程中的spark FileStreaming代码

object FileStreamExample {
  def main(args: Array[String]): Unit = {

    val sparkSession = SparkSession.builder.master("local").getOrCreate()

    val input_dir = "src/main/resources/stream_input"
    val ck = "src/main/resources/chkpoint_dir"

    //create stream from folder
    val fileStreamDf = sparkSession.readStream.csv(input_dir)

    def fileNames() = fileStreamDf.inputFiles.foreach(println(_))

    println("Streaming Started...\n")
    //fileNames() //even here it is throwing the same exception
    val query = fileStreamDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("checkpointLocation", ck)
      .start()

    fileNames();

    query.awaitTermination()

  }}

但是面对以下异常,在流式传输时
Exception in thread "main" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
FileSource[src/main/resources/stream_input]

最佳答案

您可以使用 input_file_name()org.apache.spark.sql.functions._ 中定义的函数获取将行导入到数据框中的文件名。

sparkSession.readStream.csv(input_dir).withColumn("FileName", input_file_name())

关于scala - Spark File Streaming 获取文件名,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58362352/

相关文章:

java - 使用 Java 将字符串原子追加到 Windows 中的文件

scala - 在 Scala 中可以尝试懒惰还是热切吗?

java - java中的开放类或隐式类

scala - Spark Streaming 包含文本

java - Spark `FileAlreadyExistsException` when `saveAsTextFile` 即使输出目录不存在

python - pyspark:使用从 kafka 检索的数据来训练 kmeans 流式传输

java - 从 Java 代码访问 Scala 对象

scala - 从 Spark 数据帧生成字段数组

scala - Spark 结构化流与 Hbase 集成

java - Spark 结构化流处理中遇到的问题