我已经编写了一个 Driver、Mapper 和 Reducer 程序来尝试复合键(输入数据集中的多个字段)。
数据集如下所示:
国家、州、县、人口(百万)
美国,加利福尼亚州,阿拉米达,12
美国,加利福尼亚州,圣克拉拉,14
美国,亚利桑那州,阿巴吉德,14
我正在尝试找出国家/地区的总人口。 因此,reducer 应该聚合两个字段 Country+State 并显示人口。
当我在步骤(在 reducer 代码中)遍历 population 时
for(IntWritable i:values)
我收到编译器错误“Can only iterate over an array or an instance of java.lang.Iterable”
所以我们不能在 IntWritable 上获取迭代器?我能够让迭代器处理 FloatWritable 数据类型。
非常感谢 纳特
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, FloatWritable> {
// public class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {
public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {
int numberofelements = 0;
int cnt = 0;
while (values.hasNext()) {
cnt = cnt + values.next().get();
}
//USA, Alameda = 10
//USA, Santa Clara = 12
//USA, Sacramento = 12
float populationinmillions =0;
for(IntWritable i:values)
{
populationinmillions = populationinmillions + i.get();
numberofelements = numberofelements+1;
}
// context.write(key, new IntWritable(cnt));
context.write(key, new FloatWritable(populationinmillions));
}
}
最佳答案
由于没有完整的代码,所以我没有解决您现有的用例,尽管使用 int writable 和 float wriatble 来计算 avg 的不同用例如下所示 example ..
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvgDriver extends Configured implements Tool{
public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
String record;
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
record = value.toString();
String[] fields = record.split(",");
Integer s_id = new Integer(fields[0]);
Integer marks = new Integer(fields[2]);
context.write(new IntWritable(s_id), new IntWritable(marks));
} // end of map method
} // end of mapper class
public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {
protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
Integer s_id = key.get();
Integer sum = 0;
Integer cnt = 0;
for (IntWritable value:values) {
sum = sum + value.get();
cnt = cnt + 1;
}
Float avg_m = (float) (sum/cnt);
context.write(new IntWritable(s_id), new FloatWritable(avg_m));
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
args = new GenericOptionsParser(conf, args).getRemainingArgs();
String input = args[0];
String output = args[1];
Job job = new Job(conf, "Avg");
job.setJarByClass(ImcdpMap.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(ImcdpMap.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setReducerClass(ImcdpReduce.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.setInputPaths(job, new Path(input));
Path outPath = new Path(output);
FileOutputFormat.setOutputPath(job, outPath);
outPath.getFileSystem(conf).delete(outPath, true);
job.waitForCompletion(true);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new AvgDriver(), args);
System.exit(exitCode);
}
}
但是我从您的代码中得到了以下发现,
在你的例子中,你循环遍历了 Iterator 两次,为什么? 迭代器只能进行一次遍历。一些迭代器类型是可克隆的,您可以在遍历之前克隆它,但这不是一般情况。
您正在遵循代码的旧 api 样式。 您应该让您的方法采用 Iterable 代替。
另见 this
关于hadoop - 遍历 reducer 中的 IntWritable 数组给出 "Can only iterate over an array or an instance of java.lang.Iterable",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37903825/