java - 了解 MapReduce 代码

标签 java mapreduce hadoop2 movie

我正在尝试通过制作电影推荐系统来实践大数据Mapreduce。我的代码:

*imports



public class MRS {
    public static class Map extends Mapper<LongWritable, Text, Text, Text> {
        public void map(LongWritable key, Text value, Context con)
                throws IOException, InterruptedException {
            String line = value.toString();

            StringTokenizer token = new StringTokenizer(line);

        while(token.hasMoreTokens()){
            String userId = token.nextToken();
            String movieId = token.nextToken();
            String ratings =token.nextToken();
            token.nextToken();
            con.write(new Text(userId), new Text(movieId + "," + ratings));
        }

    }
}

public static class Reduce extends
        Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
        int item_count=0;
        int item_sum =0;
        String result="[";
        for(Text t : value){
            String s = t.toString();
            StringTokenizer token = new StringTokenizer(s,",");
            while(token.hasMoreTokens()){
            token.nextToken();
            item_sum=item_sum+Integer.parseInt(token.nextToken());
            item_count++;
            }
            result=result+"("+s+"),";


        }
        result=result.substring(0, result.length()-1);
        result=result+"]";
        result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;

        con.write(key, new Text(result));
    }
}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration con = new Configuration();
    Job job = new Job(con,"Movie Recommendation");

    job.setJarByClass(MRS.class);


    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);


    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);


    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);


    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));


    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

我正在使用 here 中的 movielens 数据集

其中输入文件为u.data

运行这段代码后我的输出应该是这样的

userId Item_count,Item_sum,[带有评分的 movie_Id 列表]

但是,我得到了这个

99  173,4
99  288,4
99  66,3
99  203,4
99  105,2
99  12,5
99  1,4
99  741,3
99  895,3
99  619,4
99  742,5
99  294,4
99  196,4
99  328,4
99  120,2
99  246,3
99  232,4
99  181,5
99  201,3
99  978,3
99  123,3
99  433,4
99  345,3

这应该是 Map 类的输出

最佳答案

我对代码做了一些调整,它给了我确切的预期结果。 这是我的新代码

进口*

public class MRS {
public static class Map extends
        Mapper<LongWritable, Text, IntWritable, Text> {
    public void map(LongWritable key, Text value, Context con)
            throws IOException, InterruptedException {
        String line = value.toString();
        String[] s = line.split("\t");
        StringTokenizer token = new StringTokenizer(line);

        while (token.hasMoreTokens()) {
            IntWritable userId = new IntWritable(Integer.parseInt(token
                    .nextToken()));
            String movieId = token.nextToken();
            String ratings = token.nextToken();
            token.nextToken();
            con.write(userId, new Text(movieId + "," + ratings));
        }

    }
}

public static class Reduce extends
        Reducer<IntWritable, Text, IntWritable, Text> {
    public void reduce(IntWritable key, Iterable<Text> value, Context con)
            throws IOException, InterruptedException {
        int item_count = 0;
        int item_sum = 0;
        String result = "";
        for (Text t : value) {
            String s = t.toString();
            StringTokenizer token = new StringTokenizer(s, ",");

            result = result + "[" + s + "],";

        }
        result = result.substring(1, result.length() - 2);

        System.out.println(result);
        con.write(key, new Text(result));
    }
}

public static void main(String[] args) throws IOException,
        ClassNotFoundException, InterruptedException {
    Configuration con = new Configuration();
    Job job = new Job(con, "Movie Recommendation");

    job.setJarByClass(MRS.class);

    job.setMapperClass(Map.class);
    job.setCombinerClass(Reduce.class);
    job.setReducerClass(Reduce.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

我改变的是 驱动程序代码

job.setOutputKeyClass(IntWritable.class);

映射器代码

 Mapper<LongWritable, Text, IntWritable, Text>

reducer 代码

 public static class Reduce extends
    Reducer<Text, IntWritable, Text, Text> {
    public void reduce(Text key, Iterable<Text> value,Context con ) throws 
 IOException, InterruptedException{

我认为问题在于输出键和输出值数据与映射器类匹配,这就是为什么它打印映射器甚至不执行 reducer

如果我错了,请纠正我。

关于java - 了解 MapReduce 代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46398218/

相关文章:

hadoop - 一个 Hive 脚本可以从另一个 Hive 脚本运行吗?

hadoop - 在HDFS中删除某个文件/目录时,是否有可能收到通知?

java - 删除 VS Code PROBLEMS 窗口中的 Java 警告

java - JOptionPane YES/No 选项确认对话框问题

java - 如何以新形式导入图像

java - 基于某些业务逻辑在运行时显示/隐藏 Swing 组件的良好设计模式

xml - 上传 4GB 文件到 Amazon-S3

hadoop - 将运行多少映射器?

java - "chmod"来自 Java 的 hadoop 路径

xml - Hadoop 会为我带来更多好处吗?