我正在尝试通过制作电影推荐系统来实践大数据Mapreduce。我的代码:
*imports
public class MRS {
public static class Map extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while(token.hasMoreTokens()){
String userId = token.nextToken();
String movieId = token.nextToken();
String ratings =token.nextToken();
token.nextToken();
con.write(new Text(userId), new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws IOException, InterruptedException{
int item_count=0;
int item_sum =0;
String result="[";
for(Text t : value){
String s = t.toString();
StringTokenizer token = new StringTokenizer(s,",");
while(token.hasMoreTokens()){
token.nextToken();
item_sum=item_sum+Integer.parseInt(token.nextToken());
item_count++;
}
result=result+"("+s+"),";
}
result=result.substring(0, result.length()-1);
result=result+"]";
result=String.valueOf(item_count)+","+String.valueOf(item_sum)+","+result;
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con,"Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我正在使用 here 中的 movielens 数据集
其中输入文件为u.data
运行这段代码后我的输出应该是这样的
userId Item_count,Item_sum,[带有评分的 movie_Id 列表]
但是,我得到了这个
99 173,4
99 288,4
99 66,3
99 203,4
99 105,2
99 12,5
99 1,4
99 741,3
99 895,3
99 619,4
99 742,5
99 294,4
99 196,4
99 328,4
99 120,2
99 246,3
99 232,4
99 181,5
99 201,3
99 978,3
99 123,3
99 433,4
99 345,3
这应该是 Map 类的输出
最佳答案
我对代码做了一些调整,它给了我确切的预期结果。 这是我的新代码
进口*
public class MRS {
public static class Map extends
Mapper<LongWritable, Text, IntWritable, Text> {
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException {
String line = value.toString();
String[] s = line.split("\t");
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
IntWritable userId = new IntWritable(Integer.parseInt(token
.nextToken()));
String movieId = token.nextToken();
String ratings = token.nextToken();
token.nextToken();
con.write(userId, new Text(movieId + "," + ratings));
}
}
}
public static class Reduce extends
Reducer<IntWritable, Text, IntWritable, Text> {
public void reduce(IntWritable key, Iterable<Text> value, Context con)
throws IOException, InterruptedException {
int item_count = 0;
int item_sum = 0;
String result = "";
for (Text t : value) {
String s = t.toString();
StringTokenizer token = new StringTokenizer(s, ",");
result = result + "[" + s + "],";
}
result = result.substring(1, result.length() - 2);
System.out.println(result);
con.write(key, new Text(result));
}
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration con = new Configuration();
Job job = new Job(con, "Movie Recommendation");
job.setJarByClass(MRS.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我改变的是 驱动程序代码
job.setOutputKeyClass(IntWritable.class);
映射器代码
Mapper<LongWritable, Text, IntWritable, Text>
reducer 代码
public static class Reduce extends
Reducer<Text, IntWritable, Text, Text> {
public void reduce(Text key, Iterable<Text> value,Context con ) throws
IOException, InterruptedException{
我认为问题在于输出键和输出值数据与映射器类匹配,这就是为什么它打印映射器甚至不执行 reducer
如果我错了,请纠正我。
关于java - 了解 MapReduce 代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46398218/