需要帮助实现最佳实践。 运行环境如下:
- 日志数据文件不定期到达。
- 日志数据文件的大小为 3.9KB 到 8.5MB。平均约为 1MB。
- 一个数据文件的记录数从13行到22000行。平均约为 2700 行。
- 数据文件必须在聚合前进行后处理。
- 可以更改后处理算法。
- 后处理文件与原始数据文件分开管理,因为后处理算法可能会更改。
- 执行每日汇总。所有后处理的数据文件必须逐条记录过滤并计算聚合(平均值,最大最小值......)。
- 由于聚合是细粒度的,所以聚合后的记录数并不是那么少。可以是原始记录数的一半左右。
- 一次处理后的文件数量可达20万个左右。
- 数据文件应该能够单独删除。
在一次测试中,我尝试通过 Spark 处理 160,000 个后处理文件,从带有 glob 路径的 sc.textFile() 开始,它因驱动程序进程中的 OutOfMemory 异常而失败。
处理此类数据的最佳做法是什么? 我应该使用 HBase 而不是普通文件来保存后处理数据吗?
最佳答案
我们编写了自己的加载器。它解决了 HDFS 中小文件的问题。它使用 Hadoop CombineFileInputFormat。 在我们的案例中,它将映射器的数量从 100000 减少到大约 3000,并使工作速度显着加快。
https://github.com/RetailRocket/SparkMultiTool
示例:
import ru.retailrocket.spark.multitool.Loaders
val sessions = Loaders.combineTextFile(sc, "file:///test/*")
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n")
// where size is split size in Megabytes, delim - line break character
println(sessions.count())
关于hadoop - YARN 上的 Apache Spark : Large number of input data files (combine multiple input files in spark),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24623402/