java - 当对象由仅映射的 mapred 作业映射时,是否可以将对象的实例与一个文件关联?

标签 java hadoop mapreduce accumulo

我想使用一个 HashSet,该 HashSet 在映射一个文件时存在/适用,然后在映射下一个文件时重置/重新创建。我修改了 TextInputFormat 以覆盖 isSplitable 以返回 false,以便文件不会被分割并由 Mappers 作为一个整体进行处理。可以做这样的事情吗?或者是否有其他方法可以减少对 Accumulo 表的写入次数?

首先,我不相信我想要一个全局变量。我只是想确保唯一性,从而在我的 Accumulo 表中写入更少的突变。

我的项目是将 Index.java 文件的功能从线性 Accumulo 客户端程序的分片示例转换为使用 MapReduce 功能的程序,同时仍然在 Accumulo 中创建相同的表。它需要是mapreduce,因为这是流行词,本质上它比针对TB 级数据的线性程序运行得更快。

以下是索引代码供引用: http://grepcode.com/file/repo1.maven.org/maven2/org.apache.accumulo/examples-simple/1.4.0/org/apache/accumulo/examples/simple/shard/Index.java

该程序使用 BatchWriter 将 Mutations 写入 Accumulo,并在每个文件的基础上执行此操作。为了确保它不会编写不必要的突变并确保唯一性(尽管我相信 Accumulo 最终会通过压缩合并相同的键),Index.java 有一个 HashSet,用于确定之前是否已经运行过某个单词。这都比较容易理解。

迁移到仅映射的 MapReduce 作业更加复杂。

这是我在映射方面的尝试,从我看到的 Accumulo 表的部分输出来看,这似乎有点工作,但与线性程序 Index.java 相比,运行速度非常慢

public static class MapClass extends Mapper<LongWritable,Text,Text,Mutation> {
        private HashSet<String> tokensSeen = new HashSet<String>();
        @Override
        public void map(LongWritable key, Text value, Context output) throws IOException {
            FileSplit fileSplit = (FileSplit)output.getInputSplit();
            System.out.println("FilePath " + fileSplit.getPath().toString());
            String filePath = fileSplit.getPath().toString();
            filePath = filePath.replace("unprocessed", "processed");

            String[] words = value.toString().split("\\W+");

            for (String word : words) {
                Mutation mutation = new Mutation(genPartition(filePath.hashCode() % 10));
                word = word.toLowerCase();
                if(!tokensSeen.contains(word)) {
                    tokensSeen.add(word);
                    mutation.put(new Text(word), new Text(filePath), new Value(new byte[0]));
                }

                try {
                    output.write(null, mutation);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

缓慢的问题可能是因为我在一个测试实例上运行所有这些,这是一个带有 ZooKeeper 和 Accumulo 的 Hadoop 单节点实例。如果是这样的话,我只需要找到一个唯一性的解决方案。

非常感谢您提供的任何帮助或建议。

最佳答案

Mapper 有 setupcleanup 方法,您可以重写它们以更干净地处理此类事情。 setup 被调用一次,然后 map 被调用多次(每条记录一次),最后调用一次 cleanup 。这个想法是,您在 setup 方法中创建 HashSet,在 map 中构建它,并在 cleanup 中提交所有内容,或者定期刷新如有必要,在一些对 map 的调用中。

但是,在迁移到真正的集群之前,您几乎肯定不会看到运行时有任何改进。单节点测试实例与简单的线性程序相比几乎没有任何优势,只是一旦获得真正的 hadoop 集群,相同的代码将运行得更快。

关于java - 当对象由仅映射的 mapred 作业映射时,是否可以将对象的实例与一个文件关联?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19014834/

相关文章:

Python:使用 yelps MRJOB 增加 EMR 中的超时值

java - 计算 3D 空间中的一个点

java - 有没有办法在Java应用程序中启用鼠标滚轮(用于滚动)?

java - 这个简单的基于 Swing 的类是线程安全的吗?

apache - hadoop 作业列表已弃用

hadoop - 在Hadoop中使用Avro输入格式控制拆分大小

java - 我应该在以下示例中使用哪种 JavaFX 数据结构?

sql - Hive 和 Pig 中的不平等加入

hadoop - describe 不能在 Pig 宏中使用?

java - 有没有办法为 TensorFlow 进程设置 JAVA_OPTS