java - 具有 2 个键的 Mapreduce

标签 java eclipse hadoop mapreduce cloudera

我正在学习 map-reduce 作业。我在作业中做了一件事,我必须更改我的代码以接受另一个文本文件作为输入,并且输出必须显示位置和年份以及最大、最小和平均值。这是我输入的一行示例: Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641

输出应该是这样的: 卡尔加里 2009 年平均值为:最大值:最小值:

这是我的代码,它提供 txt 文件并计算平均值、最小值和最大值:

public class AverageMinMax {



public static class Map extends Mapper<LongWritable,Date,Text,Text> {


    //private static final FloatWritable rep= new  FloatWritable(1);
        public void map(LongWritable key,Text value,Context context)
        throws IOException, InterruptedException {
                context.write(new Text("Map_Output"), value);
        };
    }
      public static class Combiner extends Reducer<Text,Text,Text,Text>
      {
      public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
          {
             Integer NumberOfValues=0;
             double sum=0D;
             double min=0D;
             double max=0D;
             //double min=values.get(0);
              Iterator<Text> itr = values.iterator();
              //convertString=values(0);
              while(itr.hasNext())
              {
                  String TexttoString = itr.next().toString();
                  Double value = Double.parseDouble(TexttoString);
                  if(value<min)
                  {
                      min=value;
                  }
                  if(value>max)
                  {
                      max=value;
                  }
                  NumberOfValues++;
                  sum+=value;
              }
               Double average = sum/NumberOfValues;
                context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
          };
      }
 public static class Reduce extends
       Reducer<Text,Text,Text,Text> {
      public void reduce(Text key, Iterable<Text> values,
        Context context) throws IOException, InterruptedException {
           Integer totalNumberOfValues= 0;
          Double sum=0.00;
          Double min=0D;
          Double max=0D;
          Iterator<Text> itr = values.iterator();
            while(itr.hasNext())
          {
              String TexttoString = itr.next().toString();
              String[] split_String = TexttoString.split(",");
              Double average = Double.parseDouble(split_String[0]);
              Integer NumberOfValues = Integer.parseInt(split_String[1]);
              Double minValue=Double.parseDouble(split_String[2]);
              Double maxValue=Double.parseDouble(split_String[3]);
              if(minValue<min)
              {
                  min=minValue;
              }
              if(maxValue>max)
              {
                  max=maxValue;
              }
              sum+=(average*NumberOfValues);
              totalNumberOfValues+=NumberOfValues;   
          } 
          Double average= sum/totalNumberOfValues;
          context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
          };
     }
     public static void main(String[] args) throws Exception {

         Configuration conf = new Configuration();
         Job job=new Job(conf,"AverageMinMax.class");
         job.setJarByClass(AverageMinMax.class);
         job.setJobName("MapReduceAssignment");
         //JobConf conf = new JobConf(Hadoop_map_reduce.class);

        //conf.setJobName("Hadoop_assignment");
         // Configuration conf = new Configuration();
      //Job job = new Job(conf, "maxmin");
      //job.setJarByClass(Hadoop_map_reduce.class);
     // FileSystem fs = FileSystem.get(conf);
    /*  if (fs.exists(new Path(args[1]))) {
       fs.delete(new Path(args[1]), true);
      }*/
         job.setOutputKeyClass(Text.class);
         job.setOutputValueClass(Text.class);

         //job.setNumReduceTasks(1);

         job.setMapperClass(Map.class);

        job.setReducerClass(Reduce.class);
         job.setCombinerClass(Combiner.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

     FileInputFormat.addInputPath(job, new Path(args[0]));
    //  FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
      //  FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
      job.waitForCompletion(true);
     }

所以,我的第一个问题是我不知道如何在映射器中转换日期以及如何找到 2 个键并在输出中显示。我的意思是如何重写这段代码!

感谢你的帮助

最佳答案

你的问题不是很清楚。所以,我的假设如下:

  1. 您有一组数据,其中显示了您要处理的位置、日期和一些 double 值
  2. 您要处理的值从第一个 double 值开始(即 2.5207754,...)。
  3. 您的平均值是每年整个观察的所有列的平均值。 (即,如果您有 2009 年的 5 个样本,并且每个样本有 5 个值,您需要这 25 个值的平均值)。
  4. 您的最小值和最大值是相应年份的整个观察结果的最小值和最大值。

如果假设是正确的,我建议你使用Prof. Jeremy Lin's custom datatypes .可能的解决方案如下:

  1. 您的 key 将结合到文本中的位置和年份。

    String line = value.toString();
    String[] tokens = line.split(",");
    String[] date = tokens[2].split("-");
    String year = date[0];
    String location = tokens[0];
    
    Text locationYear = new Text(location + " " + year);
    
  2. 您的值将是一个 ArrayListOfDoublesWritable,您可以从我上面提到的存储库中使用它。

    ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable()
    for(int i = 5; i < tokens.length(); i++)
    {
      readings.add(Double.parseDouble(tokens[i]));
    }
    
  3. 然后您可以将映射器输出作为 Text 和 ArrayListOfDoublesWritable 发出。

    context.write(locationYear, readings);
    

从这里开始,您可以通过使用数组列表的 Collections 方法,通过计算(平均值、最小值、最大值)来操作缩减器中的映射器输出。

希望对您有所帮助。

关于java - 具有 2 个键的 Mapreduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36456090/

相关文章:

hadoop 映射溢出大小和 block 大小

hadoop - Apache Spark 通过跨集群访问 hdfs 中的数据

java - 尝试格式化 namenode 时找不到或加载主类; hadoop 在 MAC OS X 10.9.2 上的安装

java - 在不需要现有类的情况下在 java 中创建 JSON 文字

java - Joda-Time 使用 Pattern 从 String 创建 LocalDate

Eclipse 空(测试)更新站点

java - 添加图像时 JFrame 不起作用

java - 在 spring MVC 4 中插入约束外键

java - 使用主机操作系统 java 应用程序 (java Robot) 截取 VMware 虚拟操作系统的屏幕截图

android - Eclipse 插件在更新后停止工作