apache-spark - 一旦写入最终完成,如何处理 HDFS 目录中的新文件?

标签 apache-spark hadoop hdfs spark-structured-streaming

在我的场景中,我将 CSV 文件连续上传到 HDFS。

一旦上传了新文件,我想用 Spark SQL 处理新文件(例如,计算文件中字段的最大值,将文件转换为 parquet)。即我在每个输入文件和转换/处理的输出文件之间有一个一对一的映射。

我正在评估 Spark Streaming 以监听 HDFS 目录,然后使用 Spark 处理“流文件”。

但是,为了处理整个文件,我需要知道“文件流”何时完成。我想将转换应用于整个文件,以保留文件之间端到端的一对一映射。

我怎样才能转换整个文件而不是它的微批处理?

据我所知,Spark Streaming 只能将转换应用于批处理(DStreams 映射到 RDD),而不能一次应用于整个文件(当它的有限流已完成)。

这样对吗?如果是这样,我应该为我的场景考虑什么替代方案?

最佳答案

我第一次尝试可能误解了你的问题......

As far as I know, Spark Streaming can only apply transformation to batches (DStreams mapped to RDDs) and not to the whole file at once (when its finite stream has completed).

Is that correct?

没有。这是正确的。

Spark Streaming 将在 Spark Streaming 的批处理间隔结束时立即对整个文件应用转换,因为该文件已写入 HDFS。

Spark Streaming 将获取文件的当前内容并开始处理它。


As soon as a new file gets uploaded I need to process the new file with Spark/SparkSQL

几乎使用 Spark 是不可能的,因为它的架构从“上传”到 Spark 处理它需要一些时间。

您应该考虑使用全新且 Shiny 的 Structured Streaming或(即将过时)Spark Streaming .

这两种解决方案都支持监视目录中的新文件并在上传新文件后触发 Spark 作业(这正是您的用例)。

引用结构化流的 Input Sources :

In Spark 2.0, there are a few built-in sources.

  • File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

另请参阅 Spark Streaming 的 Basic Sources :

Besides sockets, the StreamingContext API provides methods for creating DStreams from files as input sources.

File Streams: For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as:

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming will monitor the directory dataDirectory and process any files created in that directory (files written in nested directories not supported).

有一点需要注意:

I would need to know when the "file stream" completes.

不要用 Spark 这样做。

引用 Spark Streaming 的 Basic Sources再次:

  • The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.

  • Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

总结...当文件完成并准备好使用 Spark 进行处理时,您应该将文件移动到 Spark 监视的目录。这超出了 Spark 的范围。

关于apache-spark - 一旦写入最终完成,如何处理 HDFS 目录中的新文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44375980/

相关文章:

dataframe - pyspark以减少/压缩的小文件数量写入配置单元表

hadoop - HDFS 追加到 SequenceFile 很慢

python - 在 PySpark 中读取文本文件时有没有办法控制分区数

apache-spark - 由于 GCS 中无法重命名错误,Spark Dataproc 作业失败

hadoop - 分布式处理说明

Hadoop Distcp - 增加 distcp.dynamic.max.chunks.tolerable 配置和调整 distcp

hadoop - Spark-SQL 在 yarn-cluster 上的错误 hdfs 权限

mapreduce - 为什么 Spark 比 Hadoop MapReduce 更快

caching - 在Spark Streaming中,我们可以将数据(hashmap)存储在Executor内存中吗

hadoop - 在Shark Hive中创建连接两个现有表的表