java - 在 MapReduce 程序中,reducer 没有被 Driver 调用

标签 java hadoop mapreduce

我根据map reduce编程模型编写了这个程序,Driver代码如下 我的司机类(class)

public class MRDriver extends Configured implements Tool
{
        @Override
    public int run(String[] strings) throws Exception {
        if(strings.length != 2)
        {
            System.err.println("usage : <inputlocation> <inputlocation> <outputlocation>");
            System.exit(0);
        }
        Job job = new Job(getConf(), "multiple files");
        job.setJarByClass(MRDriver.class);
        job.setMapperClass(MRMapper.class);        
        job.setReducerClass(MRReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path(strings[0]));
        FileOutputFormat.setOutputPath(job, new Path(strings[1]));


        return job.waitForCompletion(true) ? 0 : 1;

        //throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates.
    }
    public static void main(String[] args) throws Exception
    {
        Configuration conf = new Configuration();
        System.exit(ToolRunner.run(conf, new MRDriver(), args));
    } 
}

我的 MAPPER 类

class MRMapper extends Mapper<LongWritable, Text, Text, Text>
{
    @Override
    public void map(LongWritable key, Text value, Context context)
    {
        try 
        {          
            StringTokenizer iterator;
            String idsimval = null;
            iterator = new StringTokenizer(value.toString(), "\t");
            String id = iterator.nextToken();
            String sentival = iterator.nextToken();
            if(iterator.hasMoreTokens())
            idsimval = iterator.nextToken();
            context.write(new Text("unique"), new Text(id + "_" + sentival + "_" + idsimval));

        } catch (IOException | InterruptedException e) 
        {
            System.out.println(e);
        }
    }

我的 reducer 类

class MRReducer extends Reducer<Text, Text, Text, Text> {

    String[] records;
    HashMap<Long, String> sentiMap = new HashMap<>();
    HashMap<Long, String> cosiMap = new HashMap<>();
    private String leftIdStr;
    private ArrayList<String> rightIDList, rightSimValList, matchingSimValList, matchingIDList;
    private double leftVal;
    private double rightVal;
    private double currDiff;
    private double prevDiff;
    private int finalIndex;
    Context newContext;
    private int i;

    public void reducer(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
        for (Text string : value) {
            records = string.toString().split("_");
            sentiMap.put(Long.parseLong(records[0]), records[1]);
            if (records[2] != null) {
                cosiMap.put(Long.parseLong(records[0]), records[2]);
            }
            if(++i == 2588)
            {
                newContext = context;
                newfun();
            }
            context.write(new Text("hello"), new Text("hii"));

        }
        context.write(new Text("hello"), new Text("hii"));
    }
        void newfun() throws IOException, InterruptedException
        {
            for (HashMap.Entry<Long, String> firstEntry : cosiMap.entrySet()) {
                try {
                    leftIdStr = firstEntry.getKey().toString();
                    rightIDList = new ArrayList<>();
                    rightSimValList = new ArrayList<>();
                    matchingSimValList = new ArrayList<>();
                    matchingIDList = new ArrayList<>();
                    for (String strTmp : firstEntry.getValue().split(" ")) {
                        rightIDList.add(strTmp.substring(0, 18));
                        rightSimValList.add(strTmp.substring(19));
                    }
                    String tmp = sentiMap.get(Long.parseLong(leftIdStr));
                    if ("NULL".equals(tmp)) {
                        leftVal = Double.parseDouble("0");
                    } else {
                        leftVal = Double.parseDouble(tmp);
                    }
                    tmp = sentiMap.get(Long.parseLong(rightIDList.get(0)));
                    if ("NULL".equals(tmp)) {
                        rightVal = Double.parseDouble("0");
                    } else {
                        rightVal = Double.parseDouble(tmp);
                    }
                    prevDiff = Math.abs(leftVal - rightVal);
                    int oldIndex = 0;
                    for (String s : rightIDList) {
                        try {
                            oldIndex++;
                            tmp = sentiMap.get(Long.parseLong(s));
                            if ("NULL".equals(tmp)) {
                                rightVal = Double.parseDouble("0");
                            } else {
                                rightVal = Double.parseDouble(tmp);
                            }
                            currDiff = Math.abs(leftVal - rightVal);
                            if (prevDiff > currDiff) {
                                prevDiff = currDiff;
                            }
                        } catch (Exception e) {
                        }

                    }
                    oldIndex = 0;
                    for (String s : rightIDList) {
                        tmp = sentiMap.get(Long.parseLong(s));
                        if ("NULL".equals(tmp)) {
                            rightVal = Double.parseDouble("0");
                        } else {
                            rightVal = Double.parseDouble(tmp);
                        }
                        currDiff = Math.abs(leftVal - rightVal);
                        if (Objects.equals(prevDiff, currDiff)) {
                            matchingSimValList.add(rightSimValList.get(oldIndex));
                            matchingIDList.add(rightIDList.get(oldIndex));
                        }
                        oldIndex++;
                    }
                    finalIndex = rightSimValList.indexOf(Collections.max(matchingSimValList));
                    newContext.write(new Text(leftIdStr), new Text(" " + rightIDList.get(finalIndex) + ":" + rightSimValList.get(finalIndex)));
                } catch (NumberFormatException nfe) {

                }

            }
        }
}

问题是什么,属于map reduce程序还是hadoop系统配置?每当我运行这个程序时,它只会将映射器输出写入 hdfs。

最佳答案

在您的 Reducer 类中,您必须覆盖 reduce 方法。您正在声明一个 reducer 方法,这是不正确的。

尝试在 Reducer 类中修改您的函数:

    @Override
    public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {

关于java - 在 MapReduce 程序中,reducer 没有被 Driver 调用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38326939/

相关文章:

java - 使用 FirebaseRecyclerAdapter 将 Firebase 实时数据库中的数据检索到 Recycler View 中

java - Maven:没有要编译的资源

mapreduce - MapReduce 适合我吗?

performance - Hadoop 性能

hadoop - HDFS 中的压缩文件提取

java - mapreduce 计数示例

javascript - dc.js:使用reduceCount方法的堆积面积图

java - 无法从 fragment 内的接口(interface)获取数据

javascript - Android 上的 JQuery ajax 调用

java - 是否可以在 JAVA 中运行 HADOOP 并将文件从本地 fs 复制到 HDFS 但无需在文件系统上安装 Hadoop?