java - Java:读取hadoop reducer的输出文件

标签 java hadoop mapreduce

我正在尝试阅读和分析hadoop中的mapreduce最终输出。以下是“作业”文件中的部分代码。我想使用FileSystem(Hadoop API)读取输出文件,但是,我对将粗体突出显示的代码放在何处(在双胞胎双星之间)有疑问。如果将其放在system.exit下,恐怕代码将被跳过。

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 3) {
            System.err.println("Usage: format is <in> <out> <keyword>");
            System.exit(2);
        }

        **Path distCache = new Path("/");
        String fileSys = conf.get("fs.default.name");
        HashMap<String, Integer> jobCountMap = new HashMap<String, Integer>();**

        conf.set("jobTest", otherArgs[2]);
        Job job = new Job(conf, "job count");
        job.setJarByClass(JobResults.class);
        job.setMapperClass(JobMapper.class);
        job.setCombinerClass(JobReducer.class);
        job.setReducerClass(JobReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

        distCache = new Path(args[2]);
  //      FileSystem fs = distCache.getFileSystem(conf); // for Amazon AWS
        if (fileSys.split(":")[0].trim().equalsIgnoreCase("s3n")) distCache = new Path("s3n:/" + distCache);

        FileSystem fs = FileSystem.get(conf);           // for local cluster

        Path pathPattern = new Path(distCache, "part-r-[0-9]*");
        FileStatus[] list = fs.globStatus(pathPattern);

        for (FileStatus status : list)
        {
//          DistributedCache.addCacheFile(status.getPath().toUri(), conf);
            try {
            BufferedReader brr = new BufferedReader(new FileReader(status.getPath().toString()));
                            String line;
                while ((line = brr.readLine()) != null)
                {
                    String[] resultsCount = line.split("\\|");
                    jobCountMap.put(resultsCount[0], Integer.parseInt(resultsCount[1].trim()));
                }
            } catch (FileNotFoundException e)
            {
                e.printStackTrace();
            } catch (IOException e)
            {
               e.printStackTrace();
            }
        }

        System.out.println("the size of Hashmap is: " + jobCountMap.size());
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

最佳答案

对于System.exit问题,有一个相当简单的解决方案。您在哪里:

 System.out.println("the size of Hashmap is: " + jobCountMap.size());
    System.exit(job.waitForCompletion(true) ? 0 : 1);

而是放置以下内容:
System.out.println("the size of Hashmap is: " + jobCountMap.size());
boolean completionStatus = job.waitForCompletion(true);

//your code here

if(completionStatus==true){
    System.exit(0)
}else{
    System.exit(1)
}

这应该允许您在主要功能中运行所需的任何处理,包括根据需要启动第二个作业。

关于java - Java:读取hadoop reducer的输出文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19604163/

相关文章:

java - 将HashMap传递给jsp

java - 如何获取java html解析器中嵌套标签之间的内容?

hadoop - 使用 hadoop 命令行列出目录(及其子目录)

java - Hadoop 配置文件输出 - 在哪里和什么?

Hadoop 作业 : Error injecting constructor, JAXBException

java - 将Java hadoop作业迁移到dataproc的最佳方法是什么

java - Java 属性中的 Mule 属性占位符访问

java - 此 NavController 不知道导航 [destination_name]

hadoop - mapreduce wordcount 程序中的驱动程序未调用 reducer

string - 在整个集合的字符串字段中查找最常用的词