我已经在 HDFS 中复制了一个包含 1000 万行的文件。需要在mapper中处理行号5000到500000。我该怎么做?
我尝试在映射器中重写 run() 方法并尝试在那里设置计数器。但是当文件被分割并且多个映射器正在运行时,当然会有多个计数器在运行。所以这没有帮助。粘贴下面的代码。
@Override
public void run(Mapper<LongWritable, Text, Text, Text>.Context context)
throws IOException, InterruptedException {
setup(context);
Integer counter = 0;
while (context.nextKeyValue()) {
LongWritable currentKey = context.getCurrentKey();
Text currentValue = context.getCurrentValue();
System.out.println(currentKey.toString());
map(currentKey, currentValue, context);
counter++;
}
System.out.println("Counter: " + counter + " Time: "
+ System.currentTimeMillis());
}
此外,我在映射器中获得的 key 不是行号,而是行的偏移量。我们能得到指向行号的KEY吗?如果是这样,它在多个映射器中是否是唯一的? (当前 KEY,即偏移量,在映射器之间不是唯一的)。
我怎样才能做到正确?
最佳答案
默认的InputFormat(例如TextInputFormat)将给出记录的字节偏移量而不是实际的行号 - 这主要是由于当输入文件可分割并被处理时无法确定真实的行号两个或更多映射器。
您可以创建自己的 InputFormat 来生成行号而不是字节偏移量,但您需要配置输入格式以从 isSplittable 方法返回 false(大型输入文件不会由多个映射器处理)。如果您有小文件或大小接近 HDFS block 大小的文件,那么这不是问题。
您还可以使用 Pig 来清理数据并获取那些特别感兴趣的行并处理该特定数据。
我觉得这是 Hadoop 的一个缺点,当你想在不同系统之间共享全局状态时,Hadoop 会失败。
关于java - 从 MapReduce 映射器中的输入文件获取唯一行号,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29786397/