我已经阅读过与此相关的以前的帖子,但没有得到任何有意义的信息。
我的用例是:
- 汇总印象和点击数据
- 将点击数据和未点击数据分开放在不同的文件中。
我已经为此编写了映射器和缩减器,但缩减器的输出是包含点击和未点击的数据,并且它在同一个文件中。我想分离该数据,因此点击数据应存在于一个文件中,未点击数据应存在于其他文件中。
错误:
java.lang.IllegalStateException: Reducer has been already set
at org.apache.hadoop.mapreduce.lib.chain.Chain.checkReducerAlreadySet(Chain.java:662)
代码
Configuration conf = new Configuration();
conf.set("mapreduce.output.fileoutputformat.compress", "true");
conf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
conf.set("mapreduce.output.fileoutputformat.compress.type", "BLOCK");
Job job = Job.getInstance(conf, "IMPRESSION_CLICK_COMBINE_JOB");
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setReducerClass(ImpressionClickReducer.class);
FileInputFormat.setInputDirRecursive(job, true);
// FileInputFormat.addInputPath(job, new Path(args[0]));
// job.setMapperClass(ImpressionMapper.class);
Path p = new Path(args[2]);
FileSystem fs = FileSystem.get(conf);
fs.exists(p);
fs.delete(p, true);
/**
* Here directory of impressions will be present
*/
MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class, ImpressionMapper.class);
/**
* Here directory of clicks will be present
*/
MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class, ClickMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setNumReduceTasks(10);
job.setPartitionerClass(TrackerPartitioner.class);
ChainReducer.setReducer(job, ImpressionClickReducer.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
ChainReducer.addMapper(job, ImpressionClickMapper.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
//Below mentioned line is giving Error
//ChainReducer.setReducer(job, ImpressionAndClickReducer.class, Text.class, Text.class, Text.class, Text.class, new Configuration(false));
job.waitForCompletion(true);
最佳答案
ChainReducer 用于在 Reducer 之后链接 Map 任务,您只能调用一次 setReducer()
( See the code here )。
来自Javadocs :
The ChainReducer class allows to chain multiple Mapper classes after a Reducer within the Reducer task.
Using the ChainMapper and the ChainReducer classes is possible to compose Map/Reduce jobs that look like [MAP+ / REDUCE MAP*]. And immediate benefit of this pattern is a dramatic reduction in disk IO.
因此,您的想法是设置一个 Reducer,然后在其后链接 Map 操作。
听起来你真的想使用 MultipleOutputs . Hadoop Javadocs 提供了一个关于如何使用它的示例。有了它,您可以定义多个输出,并由您决定写入哪个输出键/值。
关于hadoop - 使用 ChainReducer 抛出异常添加多个 Reducer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39705920/