hadoop - mapreduce 程序输出重复?

标签 hadoop

我的输出中有很多重复的值,所以我实现了一个 reduce 函数,如下所示,但是这个 reduce 仍然作为一个恒等函数工作,也就是说,即使我有或没有 reduce,输出也没有区别。我的 reduce 函数有什么问题?

       public class search 
{      
    public static String str="And";
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
    {
        String mname="";
        public void configure(JobConf job)
        {
             mname=job.get(str);
             job.set(mname,str);
        }

        private Text word = new Text();
        public Text Uinput =new Text("");
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
        {

            String mapstr=mname;
            Uinput.set(mapstr);
            String line = value.toString();
            Text fdata = new Text();

            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens())
            {
                word.set(tokenizer.nextToken());
                fdata.set(line);

                if(word.equals(Uinput))
                output.collect(fdata,new Text(""));
            }

        }
    } 

    public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
    {
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
        {

            boolean start = true;
            //System.out.println("inside reduce   :"+input);
            StringBuilder sb = new StringBuilder();
            while (values.hasNext()) 
            {
                if(!start)

                start=false;
                sb.append(values.next().toString());

            }
            //output.collect(key, new IntWritable(sum));
            output.collect(key, new Text(sb.toString()));
        }
    }

public static void main(String[] args) 抛出异常 {

    JobConf conf = new JobConf(search.class);
    conf.setJobName("QueryIndex");
    //JobConf conf = new JobConf(getConf(), WordCount.class);
    conf.set(str,args[0]);

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(Map.class);
    //conf.setCombinerClass(SReducer.class);
    conf.setReducerClass(SReducer.class);

    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);



    FileInputFormat.setInputPaths(conf, new Path("IIndexOut"));
    FileOutputFormat.setOutputPath(conf, new Path("searchOut"));

    JobClient.runJob(conf);
}

最佳答案

我没有仔细查看代码,但我可以肯定的是 bool 变量 start 在这里没用,下面的代码 if (!start) 应该放在括号中以去除重复数据,否则您最终只会将所有从映射器接收到的数据写入 reducer。

 public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
{
    public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
    {

        boolean start = true;
        //System.out.println("inside reduce   :"+input);
        StringBuilder sb = new StringBuilder();
        while (values.hasNext()) 
        {
            if(!start)
            {
               start=false;
               sb.append(values.next().toString());
            }

        }
        //output.collect(key, new IntWritable(sum));
        output.collect(key, new Text(sb.toString()));
    }
}

或者最佳的 reduce 方法是:-

public static class SReducer extends MapReduceBase implements Reducer<Text, Text, Text, Text> 
  {
  public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException 
{

   //System.out.println("inside reduce   :"+input);
    StringBuilder sb = new StringBuilder();
    sb.append(values.next().toString());

    //output.collect(key, new IntWritable(sum));
    output.collect(key, new Text(sb.toString()));
}

因为您只关心迭代器的第一个值。

关于hadoop - mapreduce 程序输出重复?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10339331/

相关文章:

hadoop - hadoop集群中的各种端口?

python - 压缩的 Hadoop 序列文件 Python

java - 如何从Spark中的序列文件中提取行的范围?

java - MR 实现在 Hadoop 集群中不起作用

eclipse - 从 eclipse 启动 mapreduce 作业

hadoop - 如何从 reducer 输出中删除重复项?

hadoop - 通过将itemid列与userid交换,我可以将基于item的Hadoop实现用于基于用户的实现吗?

hadoop - 仅 map 作业的用例/示例

hadoop - 来自集群未知主机名的主机的 Spark YARN 客户端作业?

postgresql - 如何将数据从 PostgreSQL 导入到 Hadoop?