hadoop - 使用 ChainReducer 抛出异常添加多个 Reducer

标签 hadoop mapreduce

我已经阅读过与此相关的以前的帖子,但没有得到任何有意义的信息。

我的用例是:

  1. 汇总印象和点击数据
  2. 将点击数据和未点击数据分开放在不同的文件中。

我已经为此编写了映射器和缩减器,但缩减器的输出是包含点击和未点击的数据,并且它在同一个文件中。我想分离该数据,因此点击数据应存在于一个文件中,未点击数据应存在于其他文件中。

错误:

java.lang.IllegalStateException: Reducer has been already set
    at org.apache.hadoop.mapreduce.lib.chain.Chain.checkReducerAlreadySet(Chain.java:662)

代码

    Configuration conf = new Configuration();
    conf.set("mapreduce.output.fileoutputformat.compress", "true");
    conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
    conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
    conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
    Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setReducerClass(ImpressionClickReducer.class);

    FileInputFormat.setInputDirRecursive(job, true);

    // FileInputFormat.addInputPath(job, new Path(args[0]));
    // job.setMapperClass(ImpressionMapper.class);

    Path p = new Path(args[2]);
    FileSystem fs = FileSystem.get(conf);
    fs.exists(p);
    fs.delete(p, true);

    /**
     * Here directory of impressions will be present
     */
    MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
    /**
     * Here directory of clicks will be present
     */
    MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);

    FileOutputFormat.setOutputPath(job, new Path(args[2]));

    job.setNumReduceTasks(10);

    job.setPartitionerClass(TrackerPartitioner.class);

    ChainReducer.setReducer(job, ImpressionClickReducer.class,  Text.class, Text.class, Text.class, Text.class, new Configuration(false));

    ChainReducer.addMapper(job, ImpressionClickMapper.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));

    //Below mentioned line is giving Error
    //ChainReducer.setReducer(job, ImpressionAndClickReducer.class,  Text.class, Text.class, Text.class, Text.class, new Configuration(false));

    job.waitForCompletion(true);

最佳答案

ChainReducer 用于在 Reducer 之后链接 Map 任务,您只能调用一次 setReducer() ( See the code here )。

来自Javadocs :

The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.

Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.

因此,您的想法是设置一个 Reducer,然后在其后链接 Map 操作。

听起来你真的想使用 MultipleOutputs . Hadoop Javadocs 提供了一个关于如何使用它的示例。有了它,您可以定义多个输出,并由您决定写入哪个输出键/值。

关于hadoop - 使用 ChainReducer 抛出异常添加多个 Reducer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39705920/

相关文章:

memory - Pig LOAD 语句的特定内存限制?

hadoop - Hadoop-2.4.1自定义分区程序以平衡 reducer

java - Hadoop HDFS MapReduce 输出到 MongoDb

hadoop - Hadoop 上的 MapReduce 表示 'Output file already exists'

hadoop - HDFS 可以作为机器的 native 文件系统运行吗?

apache-spark - 将 Uber Jar 提交到 Google Dataproc 时如何解决 Guava 依赖问题

hadoop - 来自文件夹(不是 HDFS)的 Apache Spark Streaming

hadoop - 如何使用多定数法在Hive表中插入CSV文件

java - MapReduce 查找词长频率

hadoop - 如何编写自己的 Hadoop 调度程序?