在我的场景中,我将 CSV 文件连续上传到 HDFS。
一旦上传了新文件,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为 parquet
)。即我在每个输入文件和转换/处理的输出文件之间有一个一对一的映射。
我正在评估 Spark Streaming 以监听 HDFS 目录,然后使用 Spark 处理“流文件”。
但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间端到端的一对一映射。
我怎样才能转换整个文件而不是它的微批处理?
据我所知,Spark Streaming 只能将转换应用于批处理(DStreams
映射到 RDD
),而不能一次应用于整个文件(当它的有限流已完成)。
这样对吗?如果是这样,我应该为我的场景考虑什么替代方案?
最佳答案
我第一次尝试可能误解了你的问题......
As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).
Is that correct?
没有。这是不正确的。
Spark Streaming 将在 Spark Streaming 的批处理间隔结束时立即对整个文件应用转换,因为该文件已写入 HDFS。
Spark Streaming 将获取文件的当前内容并开始处理它。
As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL
几乎使用 Spark 是不可能的,因为它的架构从“上传”到 Spark 处理它需要一些时间。
您应该考虑使用全新且 Shiny 的 Structured Streaming或(即将过时)Spark Streaming .
这两种解决方案都支持监视目录中的新文件并在上传新文件后触发 Spark 作业(这正是您的用例)。
引用结构化流的 Input Sources :
In Spark 2.0, there are a few built-in sources.
- File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
另请参阅 Spark Streaming 的 Basic Sources :
Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.
File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).
有一点需要注意:
I would need to know when the "file stream" completes.
不要用 Spark 这样做。
引用 Spark Streaming 的 Basic Sources再次:
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.
总结...当文件完成并准备好使用 Spark 进行处理时,您应该只将文件移动到 Spark 监视的目录。这超出了 Spark 的范围。
关于apache-spark - 一旦写入最终完成,如何处理 HDFS 目录中的新文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44375980/