hadoop - 遍历 reducer 中的 IntWritable 数组给出 "Can only iterate over an array or an instance of java.lang.Iterable"

标签 hadoop mapreduce

我已经编写了一个 Driver、Mapper 和 Reducer 程序来尝试复合键(输入数据集中的多个字段)。

数据集如下所示:

国家、州、县、人口(百万)

美国,加利福尼亚州,阿拉米达,12

美国,加利福尼亚州,圣克拉拉,14

美国,亚利桑那州,阿巴吉德,14

我正在尝试找出国家/地区的总人口。 因此,reducer 应该聚合两个字段 Country+State 并显示人口。

当我在步骤(在 reducer 代码中)遍历 population 时

for(IntWritable i:values)

我收到编译器错误“Can only iterate over an array or an instance of java.lang.Iterable”

所以我们不能在 IntWritable 上获取迭代器?我能够让迭代器处理 FloatWritable 数据类型。

非常感谢 纳特

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  



 public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, FloatWritable> {

 // public  class CompositeKeyReducer extends Reducer<Country, IntWritable, Country, IntWritable> {


    public void reduce(Country key, Iterator<IntWritable> values, Context context) throws IOException, InterruptedException {

        int numberofelements = 0;

        int cnt = 0;
        while (values.hasNext()) {
            cnt = cnt + values.next().get();
        }

      //USA, Alameda = 10
      //USA, Santa Clara = 12
      //USA, Sacramento = 12

      float populationinmillions =0;

        for(IntWritable i:values)
        {
            populationinmillions = populationinmillions + i.get();
            numberofelements = numberofelements+1;


        }           


       // context.write(key, new IntWritable(cnt));
    context.write(key, new FloatWritable(populationinmillions));

    }

}

最佳答案

由于没有完整的代码,所以我没有解决您现有的用例,尽管使用 int writable 和 float wriatble 来计算 avg 的不同用例如下所示 example ..

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvgDriver extends Configured implements Tool{

    public static class ImcdpMap extends Mapper<LongWritable, Text, IntWritable, IntWritable> {

        String record;

        protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
            record = value.toString();
            String[] fields = record.split(",");

            Integer s_id = new Integer(fields[0]);
            Integer marks = new Integer(fields[2]);
            context.write(new IntWritable(s_id), new IntWritable(marks));
        } // end of map method
    } // end of mapper class


    public static class ImcdpReduce extends Reducer<IntWritable, IntWritable, IntWritable, FloatWritable> {

        protected void reduce(IntWritable key, Iterable<IntWritable> values, Reducer<IntWritable, IntWritable, IntWritable, FloatWritable>.Context context) throws IOException, InterruptedException {
            Integer s_id = key.get();
            Integer sum = 0;
            Integer cnt = 0;

            for (IntWritable value:values) {
                sum = sum + value.get();
                cnt = cnt + 1;
            }

            Float avg_m = (float) (sum/cnt);
            context.write(new IntWritable(s_id), new FloatWritable(avg_m));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        args = new GenericOptionsParser(conf, args).getRemainingArgs();
        String input = args[0];
        String output = args[1];

        Job job = new Job(conf, "Avg");
        job.setJarByClass(ImcdpMap.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(ImcdpMap.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setReducerClass(ImcdpReduce.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(FloatWritable.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        Path outPath = new Path(output);
        FileOutputFormat.setOutputPath(job, outPath);
        outPath.getFileSystem(conf).delete(outPath, true);

        job.waitForCompletion(true);
        return (job.waitForCompletion(true) ? 0 : 1);
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new AvgDriver(), args);
        System.exit(exitCode);
    }
}

但是我从您的代码中得到了以下发现,

在你的例子中,你循环遍历了 Iterator 两次,为什么? 迭代器只能进行一次遍历。一些迭代器类型是可克隆的,您可以在遍历之前克隆它,但这不是一般情况。

您正在遵循代码的旧 api 样式。 您应该让您的方法采用 Iterable 代替。

另见 this

关于hadoop - 遍历 reducer 中的 IntWritable 数组给出 "Can only iterate over an array or an instance of java.lang.Iterable",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37903825/

相关文章:

hadoop - 创建外部表但仓库为空?

hadoop - Mahout - 异常 : Java Heap space

java - Hadoop映射减少映射器编程

hadoop - 配置单元中的 SSL 警告

hadoop - 将合并器用于mapreduce二级排序

hadoop - Hive 分区的工作原理

hadoop - Hadoop2.0中的Job Tracker和TaskTracker

java - Map Reduce Eclipse 2.4.1 hadoop中的客户端jar

Hadoop HDFS : How to set hostname:9000 instead of localhost:9000

hadoop - 在理解 Hbase 读取路径方面需要帮助