hadoop - HDFS中小文件瓶颈的解决方案

标签 hadoop amazon-s3 hdfs apache-nifi

我在hdfs中有成千上万的小型csv文件。在将它们合并到单个数据帧之前,我需要分别向每个文件添加一个ID(否则,在合并中将无法区分来自不同文件的数据)。

目前,我依靠yarn分发我创建的进程,这些进程将id添加到每个文件并转换为 Parquet 格式。我发现无论我如何调整群集(大小/执行器/内存),带宽都限制在2000-3000个文件/小时。

for i in range(0,numBatches):
    fileSlice = fileList[i*batchSize:((i+1)*batchSize)]
    p = ThreadPool(numNodes)

    logger.info('\n\n\n --------------- \n\n\n')
    logger.info('Starting Batch : ' + str(i))
    logger.info('\n\n\n --------------- \n\n\n')
    p.map(lambda x: addIdCsv(x), fileSlice)

def addIdCsv(x):
    logId=x[logId]
    filePath=x[filePath]
    fLogRaw = spark.read.option("header", "true").option('inferSchema', 'true').csv(filePath)
    fLogRaw = fLogRaw.withColumn('id', F.lit(logId))
    fLog.write.mode('overwrite').parquet(filePath + '_added')

您可以看到我的集群在CPU上表现不佳。但是,在YARN管理器上,可以100%访问资源。
enter image description here

最好的办法是解决数据管道的这一部分?瓶颈是什么?

更新1
您可以在下面的事件时间线可视化中看到均匀分布的作业。
enter image description here

最佳答案

根据@ cricket_007的建议,Nifi为此问题提供了一个很好的简便解决方案,它具有比原始python更高的可伸缩性并与其他框架更好地集成。这个想法是在写入hdfs之前将文件读入Nifi(在我的情况下,它们在S3中)。读取/写入S3仍然存在固有的瓶颈,但吞吐量约为每小时4.5万个文件。

流程看起来像这样。
Nifi Flow

大多数工作在ReplaceText处理器中完成,该处理器查找行字符'|'的结尾并添加uuid和换行符。
ReplaceText Processor

关于hadoop - HDFS中小文件瓶颈的解决方案,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53647292/

相关文章:

html - 如何使用mapbox gl js添加栅格数据?

amazon-web-services - 通过 AWS [EMR] 提交 Spark 应用程序

linux - 如何查看edgenodes的数量?

hadoop - Hive 数据存储在哪里?

java - Apache Hadoop 2.6 Java堆空间错误

java - 使用 Java NIO 监视服务监视 http 资源

amazon-web-services - AWS CFT 模板 IAM 策略

hadoop - 如何在hdfs中递归查找大于特定大小(x字节)的文件?

hadoop - 如何在 hadoop 中实现加入?

java - 找不到 org.apache.hadoop.mapred.LocalClientProtocolProvider