比如说,我在同一个文件夹中有成百上千个输入文件 (.csv) 和元数据文件 (.json)。 $HDFS_ROOT/输入文件夹
//输入数据.csv 文件
input_1.csv, input_2.csv..input_N.csv
//输入元数据.json文件
input_1.json, input_2.json..input_N.json
有人可以告诉我如何让每个映射器获得文件对,即整个输入文件 (.csv) 及其元数据文件 (.json)。
注意:input_i.csv 和 input_i.json 应该转到同一个映射器,以便输入及其元数据都对验证有意义。
我尝试过的: 我尝试使用分别从 FileInputFormat 和 RecordReader 扩展的 WholeFileInputFormat 和 WholeFileRecordReader。这仅适用于 .csv 文件。此外,我将 .json 文件放入分布式缓存中,以便映射器可以访问。这不是一个好的解决方案。
最佳答案
在不使用昂贵的 Reducer 的情况下解决这个问题的关键是 InputSplits。每个 InputFormat 都有方法 getSplits,单个拆分是单个 Mapper 的输入,有多少个 InputSplits 就有多少个 Mapper。 在映射器中,可以访问 InputSplit 的实例:
@Override
protected void setup(Context context) throws IOException, InterruptedException {
System.out.println("TRACE 1 " + context.getConfiguration().getClass().getName());
System.out.println("TRACE 2 " + context.getTaskAttemptID().toString());
System.out.println("TRACE 3 " + context.getInputSplit().toString());
}
基于此,我过去使用过 3 种方法:
1) context.getInputSplit()返回一个FileSplit的实例,它有Path getPath()方法。但是您必须注意 CombineFileSplit 和 TaggedInputSplit,它们可能会环绕 FileSplit。使用 CombineFileSplit 如果您不覆盖 CombineFileInputFormat.pools 的默认行为,那么您可能会在同一个 Mapper 中混合具有不同结构的记录而无法区分它们;
2) 一种更简单的方法是使用 context.getInputSplit().toString(),返回的字符串将包含 InputSplit 附加到的路径,效果很好使用 MultipleInputs,但不使用 CombineFileInputFormat。它有点脏,因为您受制于 toString() 方法,不建议将其用于生产系统,但对于快速原型(prototype)来说已经足够了;
3) 要定义您自己的代理 InputFormat 和 InputSplit 实现,类似于 MultipleInputs 方法使用的方法,它依赖于 DelegatingInputFormat,它将可以读取数据的InputFormat的InputSplit包裹起来,放在TaggedInputSplit里面,看源码。在您的情况下,您可以将元数据逻辑隐藏在您自己的 InputFormat 和 InputSplits 中,并使映射器无需知道如何将文件与元数据匹配。您还可以直接将输入路径关联到元数据,而无需依赖命名约定。这种方法非常适合生产系统。
关于java - 如何让每个 hadoop 映射器获得一个文件对,即一个完整的输入文件 (.csv) 和一个完整的元数据文件 (.json),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28391763/