java - 如何让每个 hadoop 映射器获得一个文件对,即一个完整的输入文件 (.csv) 和一个完整的元数据文件 (.json)

标签 java hadoop mapreduce hbase

比如说,我在同一个文件夹中有成百上千个输入文件 (.csv) 和元数据文件 (.json)。 $HDFS_ROOT/输入文件夹

//输入数据.csv 文件

input_1.csv, input_2.csv..input_N.csv

//输入元数据.json文件

input_1.json, input_2.json..input_N.json

有人可以告诉我如何让每个映射器获得文件对,即整个输入文件 (.csv) 及其元数据文件 (.json)。

注意:input_i.csv 和 input_i.json 应该转到同一个映射器,以便输入及其元数据都对验证有意义。

我尝试过的: 我尝试使用分别从 FileInputFormat 和 RecordReader 扩展的 WholeFileInputFormat 和 WholeFileRecordReader。这仅适用于 .csv 文件。此外,我将 .json 文件放入分布式缓存中,以便映射器可以访问。这不是一个好的解决方案。

最佳答案

在不使用昂贵的 Reducer 的情况下解决这个问题的关键是 InputSplits。每个 InputFormat 都有方法 getSplits,单个拆分是单个 Mapper 的输入,有多少个 InputSplits 就有多少个 Mapper。 在映射器中,可以访问 InputSplit 的实例:

@Override
protected void setup(Context context) throws IOException, InterruptedException {
     System.out.println("TRACE 1 " + context.getConfiguration().getClass().getName());
     System.out.println("TRACE 2 " + context.getTaskAttemptID().toString());
     System.out.println("TRACE 3 " + context.getInputSplit().toString());

}

基于此,我过去使用过 3 种方法:

1) context.getInputSplit()返回一个FileSplit的实例,它有Path getPath()方法。但是您必须注意 CombineFileSplitTaggedInputSplit,它们可能会环绕 FileSplit。使用 CombineFileSplit 如果您不覆盖 CombineFileInputFormat.pools 的默认行为,那么您可能会在同一个 Mapper 中混合具有不同结构的记录而无法区分它们;

2) 一种更简单的方法是使用 context.getInputSplit().toString(),返回的字符串将包含 InputSplit 附加到的路径,效果很好使用 MultipleInputs,但不使用 CombineFileInputFormat。它有点脏,因为您受制于 toString() 方法,不建议将其用于生产系统,但对于快速原型(prototype)来说已经足够了;

3) 要定义您自己的代理 InputFormatInputSplit 实现,类似于 MultipleInputs 方法使用的方法,它依赖于 DelegatingInputFormat,它将可以读取数据的InputFormatInputSplit包裹起来,放在TaggedInputSplit里面,看源码。在您的情况下,您可以将元数据逻辑隐藏在您自己的 InputFormatInputSplits 中,并使映射器无需知道如何将文件与元数据匹配。您还可以直接将输入路径关联到元数据,而无需依赖命名约定。这种方法非常适合生产系统。

关于java - 如何让每个 hadoop 映射器获得一个文件对,即一个完整的输入文件 (.csv) 和一个完整的元数据文件 (.json),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28391763/

相关文章:

java - MapType 在 Spark 3.x : Encoders. bean 中导致 AnalysisException 到包含 map<String, someClass> 的对象失败,这在 Spark 2.4 中工作正常

java - Hadoop - 按前缀聚合

apache-spark - 人们在谈论Hadoop,Spark和大数据时, “intermediate results”是什么意思?

linux - 如何更改hadoop文件系统中的文件权限

java - Hadoop 给 reducer 带来了什么?

hadoop - 作为输入可以使用Apache Pig Load Function Bag吗?

java - 如何处理 "AES/GCM/NoPadding"的 IV 和身份验证标签?

java - 创建每 XXXXms 调用一个方法的计时器

java - 我如何知道可以存储在数组中的元素的最大范围?

hadoop - 如何检查HDFS上文件的格式?