java - 第二MR工作没有在Hadoop中终止

标签 java hadoop mapreduce

我有一个用例,我想将第一个MR的输出转换为第二个MR的输入。我在Hadoop中使用ControlJob实现了这一目标,但是在工作结束时,我得到了下面提到的Exception。

java.lang.IllegalStateException: Job in state RUNNING instead of DEFINE
    at org.apache.hadoop.mapreduce.Job.ensureState(Job.java:294)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1288)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob.submit(ControlledJob.java:335)
    at org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl.run(JobControl.java:240)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.runMRJobs(JoinClickImpressionDetailJob.java:353)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.run(JoinClickImpressionDetailJob.java:421)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.hadoop.intellipaat.JoinClickImpressionDetailJob.main(JoinClickImpressionDetailJob.java:309)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

源代码
public static void main(String[] args) throws Exception {
        ToolRunner.run(new Configuration(), new JoinClickImpressionDetailJob(), args);
        System.exit(1);
    }

    private static int runMRJobs(String[] args) {
        int result = -1;
        Configuration conf = new Configuration();
        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");

        ControlledJob mrJob1 = null;
        Job firstJob = null;
        try {
            deleteDirectory(args[2], conf);
            mrJob1 = new ControlledJob(conf);
            mrJob1.setJobName("IMPRESSION_CLICK_COMBINE_JOB");
            firstJob = mrJob1.getJob();
            result += firstMapReduceJob(args, firstJob);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("First Job Finished=============");

        System.out.println("Second Job Started=============");

        ControlledJob mrJob2 = null;
        try {
            mrJob2 = new ControlledJob(conf);
            deleteDirectory(args[3], conf);
            mrJob2.addDependingJob(mrJob1);
            mrJob2.setJobName("IMPRESSION_CLICK_COMBINE_JOB1");
            Job job2 = mrJob2.getJob();
            result += secondMapReduceJob(args, job2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("Second Job Finished=============");

        JobControl jobControl = new JobControl("Click-Impression-aggregator");
        jobControl.addJob(mrJob1);
        jobControl.addJob(mrJob2);
        jobControl.run();
        return result;
    }

    private static int secondMapReduceJob(String[] args, Job job2) throws IOException, InterruptedException, ClassNotFoundException {
        long startTime = System.currentTimeMillis();

        job2.setMapOutputKeyClass(Text.class);
        job2.setMapOutputValueClass(Text.class);
        job2.setJarByClass(JoinClickImpressionDetailJob.class);

        job2.setInputFormatClass(TextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        job2.setReducerClass(ImpressionAndClickReducer.class);

        FileInputFormat.setInputDirRecursive(job2, true);
        FileInputFormat.addInputPath(job2, new Path(args[2]));
        job2.setMapperClass(ImpressionClickMapper.class);

        FileOutputFormat.setOutputPath(job2, new Path(args[3]));
        job2.setNumReduceTasks(8);
        job2.setPartitionerClass(ClickNonClickPartitioner.class);
        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job2.waitForCompletion(true) ? 1 : 0;
    }

    private static int firstMapReduceJob(String[] args, Job job) throws IOException, InterruptedException, ClassNotFoundException {

        long startTime = System.currentTimeMillis();
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setJarByClass(JoinClickImpressionDetailJob.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        job.setReducerClass(ImpressionClickReducer.class);

        FileInputFormat.setInputDirRecursive(job, true);
        /**
         * Here directory of impressions will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
        /**
         * Here directory of clicks will be present
         */
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);

        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.setNumReduceTasks(10);

        job.setPartitionerClass(TrackerPartitioner.class);

        System.out.println("Time taken : " + (System.currentTimeMillis() - startTime) / 1000);
        return job.waitForCompletion(true) ? 1 : 0;
    }

    private static void deleteDirectory(String args, Configuration conf) throws IOException {
        Path p = new Path(args);
        FileSystem fs = FileSystem.get(conf);
        fs.exists(p);
        fs.delete(p, true);
    }

    @Override
    public int run(String[] args) throws Exception {
        return runMRJobs(args);
    }

完整代码:
https://github.com/ragnar-lothbrok/hadoop-demo/blob/master/src/main/java/com/hadoop/intellipaat/JoinClickImpressionDetailJob.java

最佳答案

如下更新代码。您可以在没有控制工作的情况下实现此目的。

private static int runMRJobs(String[] args) {

        int result = -1;      

 Configuration conf = getConf();

        conf.set("mapreduce.output.fileoutputformat.compress", "true");
        conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
        conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
        conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");

        Job job1 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB");

            try {
                deleteDirectory(args[2], conf);
                result += firstMapReduceJob(args, job1);
            } catch (Exception e) {
                e.printStackTrace();
            }

            System.out.println("First Job Finished=============");

            System.out.println("Second Job Started=============");

            Job job2 = new Job(conf,"IMPRESSION_CLICK_COMBINE_JOB1");
            try {
                deleteDirectory(args[3], conf);
                result += secondMapReduceJob(args, job2);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Second Job Finished=============");
    return result;
    }

在您的第一份工作返回中,如下所示
return job1.waitForCompletion(true);

在第二份工作中,如下所示
return job2.waitForCompletion(true) ? 0 : 1;

关于java - 第二MR工作没有在Hadoop中终止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39757576/

相关文章:

java - 未在hadoop上调用Map函数

security - 不推荐使用Hadoop KMS?

hadoop - 如何在Hadoop中的集群上划分数据

join - Hadoop 加入字符串键

java - 使用 javapackager 构建 pkg 时如何授予写入应用程序文件夹的权限

java - ImageIO jpg resizeImage后图像变成粉红色

java - 使用计时器自动关闭 JOptionPane.showconfirmDialog

java - 我如何使用混淆器?

hadoop - Hadoop:mapreduce可以更新输入吗?

java - Hadoop input.FileSplit 不能大小写为 mapred.FileSplit