我想使用一个 HashSet,该 HashSet 在映射一个文件时存在/适用,然后在映射下一个文件时重置/重新创建。我修改了 TextInputFormat 以覆盖 isSplitable 以返回 false,以便文件不会被分割并由 Mappers 作为一个整体进行处理。可以做这样的事情吗?或者是否有其他方法可以减少对 Accumulo 表的写入次数?
首先,我不相信我想要一个全局变量。我只是想确保唯一性,从而在我的 Accumulo 表中写入更少的突变。
我的项目是将 Index.java 文件的功能从线性 Accumulo 客户端程序的分片示例转换为使用 MapReduce 功能的程序,同时仍然在 Accumulo 中创建相同的表。它需要是mapreduce,因为这是流行词,本质上它比针对TB 级数据的线性程序运行得更快。
该程序使用 BatchWriter 将 Mutations 写入 Accumulo,并在每个文件的基础上执行此操作。为了确保它不会编写不必要的突变并确保唯一性(尽管我相信 Accumulo 最终会通过压缩合并相同的键),Index.java 有一个 HashSet,用于确定之前是否已经运行过某个单词。这都比较容易理解。
迁移到仅映射的 MapReduce 作业更加复杂。
这是我在映射方面的尝试,从我看到的 Accumulo 表的部分输出来看,这似乎有点工作,但与线性程序 Index.java 相比,运行速度非常慢
public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
private HashSet<String> tokensSeen = new HashSet<String>();
@Override
public void map(LongWritable key, Text value, Context output) throws IOException {
FileSplit fileSplit = (FileSplit)output.getInputSplit();
System.out.println("FilePath " + fileSplit.getPath().toString());
String filePath = fileSplit.getPath().toString();
filePath = filePath.replace("unprocessed", "processed");
String[] words = value.toString().split("\\W+");
for (String word : words) {
Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
word = word.toLowerCase();
if(!tokensSeen.contains(word)) {
tokensSeen.add(word);
mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
}
try {
output.write(null, mutation);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
缓慢的问题可能是因为我在一个测试实例上运行所有这些,这是一个带有 ZooKeeper 和 Accumulo 的 Hadoop 单节点实例。如果是这样的话,我只需要找到一个唯一性的解决方案。
非常感谢您提供的任何帮助或建议。
最佳答案
Mapper 有 setup
和 cleanup
方法,您可以重写它们以更干净地处理此类事情。 setup
被调用一次,然后 map
被调用多次(每条记录一次),最后调用一次 cleanup
。这个想法是,您在 setup
方法中创建 HashSet,在 map
中构建它,并在 cleanup
中提交所有内容,或者定期刷新如有必要,在一些对 map
的调用中。
但是,在迁移到真正的集群之前,您几乎肯定不会看到运行时有任何改进。单节点测试实例与简单的线性程序相比几乎没有任何优势,只是一旦获得真正的 hadoop 集群,相同的代码将运行得更快。
关于java - 当对象由仅映射的 mapred 作业映射时,是否可以将对象的实例与一个文件关联?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19014834/