我正在学习 map-reduce 作业。我在作业中做了一件事,我必须更改我的代码以接受另一个文本文件作为输入,并且输出必须显示位置和年份以及最大、最小和平均值。这是我输入的一行示例:
Calgary,AB,2009-01-07,604680,12694,2.5207754,0.065721168,0.025668362,0.972051954,0.037000279,0.022319018,,,0.003641149,,,0.002936745,,,0.016723641
输出应该是这样的:
卡尔加里 2009 年平均值为:最大值:最小值:
这是我的代码,它提供 txt 文件并计算平均值、最小值和最大值:
public class AverageMinMax {
public static class Map extends Mapper<LongWritable,Date,Text,Text> {
//private static final FloatWritable rep= new FloatWritable(1);
public void map(LongWritable key,Text value,Context context)
throws IOException, InterruptedException {
context.write(new Text("Map_Output"), value);
};
}
public static class Combiner extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key,Iterable<Text> values,Context context) throws IOException,InterruptedException
{
Integer NumberOfValues=0;
double sum=0D;
double min=0D;
double max=0D;
//double min=values.get(0);
Iterator<Text> itr = values.iterator();
//convertString=values(0);
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
Double value = Double.parseDouble(TexttoString);
if(value<min)
{
min=value;
}
if(value>max)
{
max=value;
}
NumberOfValues++;
sum+=value;
}
Double average = sum/NumberOfValues;
context.write(new Text("Combiner_output"), new Text(average + "," + NumberOfValues+","+min+","+max));
};
}
public static class Reduce extends
Reducer<Text,Text,Text,Text> {
public void reduce(Text key, Iterable<Text> values,
Context context) throws IOException, InterruptedException {
Integer totalNumberOfValues= 0;
Double sum=0.00;
Double min=0D;
Double max=0D;
Iterator<Text> itr = values.iterator();
while(itr.hasNext())
{
String TexttoString = itr.next().toString();
String[] split_String = TexttoString.split(",");
Double average = Double.parseDouble(split_String[0]);
Integer NumberOfValues = Integer.parseInt(split_String[1]);
Double minValue=Double.parseDouble(split_String[2]);
Double maxValue=Double.parseDouble(split_String[3]);
if(minValue<min)
{
min=minValue;
}
if(maxValue>max)
{
max=maxValue;
}
sum+=(average*NumberOfValues);
totalNumberOfValues+=NumberOfValues;
}
Double average= sum/totalNumberOfValues;
context.write(new Text("Average and Minimum and Max is"), new Text(average.toString()+" and "+ min.toString()+" and "+ max.toString()));
};
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job=new Job(conf,"AverageMinMax.class");
job.setJarByClass(AverageMinMax.class);
job.setJobName("MapReduceAssignment");
//JobConf conf = new JobConf(Hadoop_map_reduce.class);
//conf.setJobName("Hadoop_assignment");
// Configuration conf = new Configuration();
//Job job = new Job(conf, "maxmin");
//job.setJarByClass(Hadoop_map_reduce.class);
// FileSystem fs = FileSystem.get(conf);
/* if (fs.exists(new Path(args[1]))) {
fs.delete(new Path(args[1]), true);
}*/
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setNumReduceTasks(1);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setCombinerClass(Combiner.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));
//FileInputFormat.addInputPath(job, new Path("/home/cloudera/Desktop/assign2"));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// FileOutputFormat.setOutputPath(job, new Path(" user/cloudera/output"));
job.waitForCompletion(true);
}
所以,我的第一个问题是我不知道如何在映射器中转换日期以及如何找到 2 个键并在输出中显示。我的意思是如何重写这段代码!
感谢你的帮助
最佳答案
你的问题不是很清楚。所以,我的假设如下:
- 您有一组数据,其中显示了您要处理的位置、日期和一些 double 值
- 您要处理的值从第一个 double 值开始(即 2.5207754,...)。
- 您的平均值是每年整个观察的所有列的平均值。 (即,如果您有 2009 年的 5 个样本,并且每个样本有 5 个值,您需要这 25 个值的平均值)。
- 您的最小值和最大值是相应年份的整个观察结果的最小值和最大值。
如果假设是正确的,我建议你使用Prof. Jeremy Lin's custom datatypes .可能的解决方案如下:
您的 key 将结合到文本中的位置和年份。
String line = value.toString(); String[] tokens = line.split(","); String[] date = tokens[2].split("-"); String year = date[0]; String location = tokens[0]; Text locationYear = new Text(location + " " + year);
您的值将是一个 ArrayListOfDoublesWritable,您可以从我上面提到的存储库中使用它。
ArrayListOfDoublesWritable readings = new ArrayListOfDoublesWritable() for(int i = 5; i < tokens.length(); i++) { readings.add(Double.parseDouble(tokens[i])); }
然后您可以将映射器输出作为 Text 和 ArrayListOfDoublesWritable 发出。
context.write(locationYear, readings);
从这里开始,您可以通过使用数组列表的 Collections 方法,通过计算(平均值、最小值、最大值)来操作缩减器中的映射器输出。
希望对您有所帮助。
关于java - 具有 2 个键的 Mapreduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36456090/