java - Hadoop中的MapReduce程序,实现了一个简单的 “People You Might Know”

标签 java hadoop


import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Friends 

public class FriendsMap extends Mapper < LongWritable, Text, Text, IntWritable >
    private Text friendsAB;
    private Text friendsBA;
    private IntWritable one = new IntWritable(1);
    private IntWritable oneLess = new IntWritable(-999999999);
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
            String friendsOfA[] = null;     //This will be all of the friends of the user in this row
            String oneRow[] = value.toString().split("\t,");    //Break the row up into users IDs
            String userA = oneRow[0];       //This is the main user for this row
            for (int i=1; i < oneRow.length; i++)   //Create an array of the rest of the users in this row
                friendsOfA[i-1] = oneRow[i];
            for (int i=0; i < oneRow.length; i++)   //Output the main user in pairs with all friends plus a lagre negative #
                friendsAB.set(userA + " " + friendsOfA[i]);
                context.write(friendsAB, oneLess);
                System.out.println(friendsAB + " " + oneLess);
            for (int i = 0; i < friendsOfA.length; i++)     //Output each friend pair plus the number 1
                for (int j = i + 1; j < friendsOfA.length; j++) 
                    friendsAB.set(friendsOfA[i] + " " + friendsOfA[j]);
                    friendsBA.set(friendsOfA[j] + " " + friendsOfA[i]);
                    context.write(friendsAB, one);
                    context.write(friendsBA, one);
                    System.out.println(friendsAB + " " + one);
                    System.out.println(friendsBA + " " + one);

class FriendReducer extends Reducer < Text, IntWritable, Text, IntWritable > 
        private IntWritable result = new IntWritable(); 
        public void reduce( Text key, Iterable < IntWritable > values, Context context) throws IOException, InterruptedException 
            int sum = 0; 
            for (IntWritable val : values) 
                sum += val.get(); 
            if (sum > 1)
                result.set( sum); 
                context.write( key, result);
            //At this point I have all pairs of users with recomenede friends and a count of how many times they each
            //friend has been recomended to a user.
            //I need to sort by user and then by number of recomendations.
            //Then print the user <tab> all recomendations with commas between them.

public static void main( String[] args) throws Exception 
        Configuration conf = new Configuration();
        Job job = Job.getInstance( conf, "Friends");
        FileInputFormat.addInputPath( job, new Path("input")); 
        FileOutputFormat.setOutputPath( job, new Path("output")); 
        job.setMapperClass( FriendsMap.class); 
        job.setCombinerClass( FriendReducer.class); 
        job.setReducerClass( FriendReducer.class);
        job.setOutputKeyClass( Text.class); 
        job.setOutputValueClass( IntWritable.class);

        System.exit( job.waitForCompletion( true) ? 0 : 1); 


17/11/15 16:05:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/11/15 16:06:54 INFO Configuration.deprecation: is deprecated. Instead, use dfs.metrics.session-id 17/11/15 16:06:54 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/11/15 16:06:54 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 17/11/15 16:06:55 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 17/11/15 16:06:55 INFO input.FileInputFormat: Total input paths to process : 2 17/11/15 16:07:05 INFO mapred.JobClient: Running job: job_local426825952_0001 17/11/15 16:07:05 INFO mapred.LocalJobRunner: OutputCommitter set in config null 17/11/15 16:07:05 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 17/11/15 16:07:05 INFO mapred.LocalJobRunner: Waiting for map tasks 17/11/15 16:07:05 INFO mapred.LocalJobRunner: Starting task: attempt_local426825952_0001_m_000000_0 17/11/15 16:07:05 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/15 16:07:05 INFO util.ProcessTree: setsid exited with exit code 0 17/11/15 16:07:05 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@670217f0 17/11/15 16:07:05 INFO mapred.LocalJobRunner: Starting task: attempt_local426825952_0001_m_000001_0 17/11/15 16:07:05 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/15 16:07:05 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@1688e9ff 17/11/15 16:07:05 INFO mapred.LocalJobRunner: Map task executor complete. 17/11/15 16:07:05 WARN mapred.LocalJobRunner: job_local426825952_0001 java.lang.Exception: java.lang.RuntimeException: java.lang.NoSuchMethodException: Friends$FriendsMap.() at org.apache.hadoop.mapred.LocalJobRunner$ Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: Friends$FriendsMap.() at org.apache.hadoop.util.ReflectionUtils.newInstance( at org.apache.hadoop.mapred.MapTask.runNewMapper( at at org.apache.hadoop.mapred.LocalJobRunner$Job$ at java.util.concurrent.Executors$ at at java.util.concurrent.ThreadPoolExecutor.runWorker( at java.util.concurrent.ThreadPoolExecutor$ at Caused by: java.lang.NoSuchMethodException: Friends$FriendsMap.() at java.lang.Class.getConstructor0( at java.lang.Class.getDeclaredConstructor( at org.apache.hadoop.util.ReflectionUtils.newInstance( ... 8 more 17/11/15 16:07:06 INFO mapred.JobClient: map 0% reduce 0% 17/11/15 16:07:06 INFO mapred.JobClient: Job complete: job_local426825952_0001 17/11/15 16:07:06 INFO mapred.JobClient: Counters: 0


17/11/16 04:28:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/11/16 04:28:52 INFO Configuration.deprecation: is deprecated. Instead, use dfs.metrics.session-id 17/11/16 04:28:52 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/11/16 04:28:52 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 17/11/16 04:28:52 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 17/11/16 04:28:53 INFO input.FileInputFormat: Total input paths to process : 2 17/11/16 04:28:54 INFO mapred.LocalJobRunner: OutputCommitter set in config null 17/11/16 04:28:54 INFO mapred.JobClient: Running job: job_local1593958162_0001 17/11/16 04:28:54 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 17/11/16 04:28:54 INFO mapred.LocalJobRunner: Waiting for map tasks 17/11/16 04:28:54 INFO mapred.LocalJobRunner: Starting task: attempt_local1593958162_0001_m_000000_0 17/11/16 04:28:54 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/16 04:28:54 INFO util.ProcessTree: setsid exited with exit code 0 17/11/16 04:28:54 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@57d51956 17/11/16 04:28:54 INFO mapred.MapTask: Processing split: file:/home/cloudera/workspace/Assignment4/input/Sample4.txt:0+4106187 17/11/16 04:28:54 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/11/16 04:28:54 INFO mapred.MapTask: io.sort.mb = 100 17/11/16 04:28:55 INFO mapred.MapTask: data buffer = 79691776/99614720 17/11/16 04:28:55 INFO mapred.MapTask: record buffer = 262144/327680 17/11/16 04:28:55 INFO mapred.LocalJobRunner: Starting task: attempt_local1593958162_0001_m_000001_0 17/11/16 04:28:55 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/16 04:28:55 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@774140b3 17/11/16 04:28:55 INFO mapred.MapTask: Processing split: file:/home/cloudera/workspace/Assignment4/input/Sample4.txt~:0+0 17/11/16 04:28:55 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/11/16 04:28:55 INFO mapred.MapTask: io.sort.mb = 100 17/11/16 04:28:55 INFO mapred.JobClient: map 0% reduce 0% 17/11/16 04:28:55 INFO mapred.MapTask: data buffer = 79691776/99614720 17/11/16 04:28:55 INFO mapred.MapTask: record buffer = 262144/327680 17/11/16 04:28:55 INFO mapred.LocalJobRunner: 17/11/16 04:28:55 INFO mapred.MapTask: Starting flush of map output 17/11/16 04:28:55 INFO mapred.Task: Task:attempt_local1593958162_0001_m_000001_0 is done. And is in the process of commiting 17/11/16 04:28:55 INFO mapred.LocalJobRunner: 17/11/16 04:28:55 INFO mapred.Task: Task 'attempt_local1593958162_0001_m_000001_0' done. 17/11/16 04:28:55 INFO mapred.LocalJobRunner: Finishing task: attempt_local1593958162_0001_m_000001_0 17/11/16 04:28:55 INFO mapred.LocalJobRunner: Map task executor complete. 17/11/16 04:28:55 WARN mapred.LocalJobRunner: job_local1593958162_0001 java.lang.Exception: java.lang.NullPointerException at org.apache.hadoop.mapred.LocalJobRunner$ Caused by: java.lang.NullPointerException at Friends$ at Friends$ at at org.apache.hadoop.mapred.MapTask.runNewMapper( at at org.apache.hadoop.mapred.LocalJobRunner$Job$ at java.util.concurrent.Executors$ at at java.util.concurrent.ThreadPoolExecutor.runWorker( at java.util.concurrent.ThreadPoolExecutor$ at 17/11/16 04:28:56 INFO mapred.JobClient: Job complete: job_local1593958162_0001 17/11/16 04:28:56 INFO mapred.JobClient: Counters: 16 17/11/16 04:28:56 INFO mapred.JobClient: File System Counters 17/11/16 04:28:56 INFO mapred.JobClient: FILE: Number of bytes read=4674 17/11/16 04:28:56 INFO mapred.JobClient: FILE: Number of bytes written=139416 17/11/16 04:28:56 INFO mapred.JobClient: FILE: Number of read operations=0 17/11/16 04:28:56 INFO mapred.JobClient:
FILE: Number of large read operations=0 17/11/16 04:28:56 INFO mapred.JobClient: FILE: Number of write operations=0 17/11/16 04:28:56 INFO mapred.JobClient: Map-Reduce Framework 17/11/16 04:28:56 INFO mapred.JobClient: Map input records=0 17/11/16 04:28:56 INFO mapred.JobClient: Map output records=0 17/11/16 04:28:56 INFO mapred.JobClient: Map output bytes=0 17/11/16 04:28:56 INFO mapred.JobClient: Input split bytes=125 17/11/16 04:28:56 INFO mapred.JobClient: Combine input records=0 17/11/16 04:28:56 INFO mapred.JobClient: Combine output records=0 17/11/16 04:28:56 INFO mapred.JobClient: Spilled Records=0 17/11/16 04:28:56 INFO mapred.JobClient: CPU time spent (ms)=0 17/11/16 04:28:56 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 17/11/16 04:28:56 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 17/11/16 04:28:56 INFO mapred.JobClient: Total committed heap usage (bytes)=363696128


Screen Shot of some errors

public static class FriendsMap extends Mapper < LongWritable, Text, Text, IntWritable >
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

            String friendsOfA[];    //This will be all of the friends of the user in this row
            friendsOfA = new String[] {};
            String friendsAB        = "1";      //This will be used to create pairs of users
            String friendsBA        = "2";      //This will be used to create pairs of users
            Text pairA;
            Text pairB; 
            IntWritable one     = new IntWritable(1);           //1 if they are not an existing pair here
            IntWritable oneLess = new IntWritable(-999999999);  // if they are an existing pair

            String oneRow[] = value.toString().split("\t,");    //Break the row up into users IDs
            Text userA = new Text(oneRow[0]);                                   //This is the main user for this row
            for (int i=1; i < oneRow.length; i++)   //Create an array of the rest of the users in this row
                friendsOfA[i-1] = oneRow[i];
            for (int i=0; i < oneRow.length; i++)   //Output the main user in pairs with all friends plus a large negative #
            {                                       //We do not want to recommend them as friends because they are friends 
                Text FOA = new Text (friendsOfA[i]);
                friendsAB = (userA + " " + FOA);
                Text pair = new Text (friendsAB);
                context.write(pair, oneLess);
                System.out.println(pair + " " + oneLess);
            for (int i = 0; i < friendsOfA.length; i++)     //Output each friend pair plus the number 1
            {                                               //We want to recommend them as potential friends
                for (int j = i + 1; j < friendsOfA.length; j++) 
                    Text FOA = new Text (friendsOfA[i]);
                    Text FOB = new Text (friendsOfA[j]);
                    friendsAB = (FOA + " " + FOB);
                    friendsBA = (FOB + " " + FOA);
                    pairA = new Text (friendsAB);
                    pairB = new Text (friendsBA);
                    context.write(pairA, one);
                    context.write(pairB, one);
                    System.out.println(pairA + " " + one);
                    System.out.println(pairB + " " + one);


17/11/16 11:59:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 17/11/16 11:59:27 INFO Configuration.deprecation: is deprecated. Instead, use dfs.metrics.session-id 17/11/16 11:59:27 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 17/11/16 11:59:27 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 17/11/16 11:59:27 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 17/11/16 11:59:27 INFO input.FileInputFormat: Total input paths to process : 2 17/11/16 11:59:29 INFO mapred.JobClient: Running job: job_local1899187381_0001 17/11/16 11:59:29 INFO mapred.LocalJobRunner: OutputCommitter set in config null 17/11/16 11:59:29 INFO mapred.LocalJobRunner: OutputCommitter is org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter 17/11/16 11:59:29 INFO mapred.LocalJobRunner: Waiting for map tasks 17/11/16 11:59:29 INFO mapred.LocalJobRunner: Starting task: attempt_local1899187381_0001_m_000000_0 17/11/16 11:59:29 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/16 11:59:29 INFO util.ProcessTree: setsid exited with exit code 0 17/11/16 11:59:29 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@4f94aaa1 17/11/16 11:59:29 INFO mapred.MapTask: Processing split: file:/home/cloudera/workspace/Assignment4/input/Sample4.txt:0+4106187 17/11/16 11:59:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/11/16 11:59:29 INFO mapred.MapTask: io.sort.mb = 100 17/11/16 11:59:29 INFO mapred.MapTask: data buffer = 79691776/99614720 17/11/16 11:59:29 INFO mapred.MapTask: record buffer = 262144/327680 17/11/16 11:59:29 INFO mapred.LocalJobRunner: Starting task: attempt_local1899187381_0001_m_000001_0 17/11/16 11:59:29 WARN mapreduce.Counters: Group org.apache.hadoop.mapred.Task$Counter is deprecated. Use org.apache.hadoop.mapreduce.TaskCounter instead 17/11/16 11:59:29 INFO mapred.Task: Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@622ecc38 17/11/16 11:59:29 INFO mapred.MapTask: Processing split: file:/home/cloudera/workspace/Assignment4/input/Sample4.txt~:0+0 17/11/16 11:59:29 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer 17/11/16 11:59:29 INFO mapred.MapTask: io.sort.mb = 100 17/11/16 11:59:30 INFO mapred.JobClient: map 0% reduce 0% 17/11/16 11:59:30 INFO mapred.MapTask: data buffer = 79691776/99614720 17/11/16 11:59:30 INFO mapred.MapTask: record buffer = 262144/327680 17/11/16 11:59:30 INFO mapred.LocalJobRunner: 17/11/16 11:59:30 INFO mapred.MapTask: Starting flush of map output 17/11/16 11:59:30 INFO mapred.Task: Task:attempt_local1899187381_0001_m_000001_0 is done. And is in the process of commiting 17/11/16 11:59:30 INFO mapred.LocalJobRunner: 17/11/16 11:59:30 INFO mapred.Task: Task 'attempt_local1899187381_0001_m_000001_0' done. 17/11/16 11:59:30 INFO mapred.LocalJobRunner: Finishing task: attempt_local1899187381_0001_m_000001_0 17/11/16 11:59:30 INFO mapred.LocalJobRunner: Map task executor complete. 17/11/16 11:59:30 WARN mapred.LocalJobRunner: job_local1899187381_0001 java.lang.Exception: java.lang.ArrayIndexOutOfBoundsException: 0 at org.apache.hadoop.mapred.LocalJobRunner$ Caused by: java.lang.ArrayIndexOutOfBoundsException: 0 at Friends$ at Friends$ at at org.apache.hadoop.mapred.MapTask.runNewMapper( at at org.apache.hadoop.mapred.LocalJobRunner$Job$ at java.util.concurrent.Executors$ at at java.util.concurrent.ThreadPoolExecutor.runWorker( at java.util.concurrent.ThreadPoolExecutor$ at 17/11/16 11:59:31 INFO mapred.JobClient: Job complete: job_local1899187381_0001 17/11/16 11:59:31 INFO mapred.JobClient: Counters: 16 17/11/16 11:59:31 INFO mapred.JobClient: File System Counters 17/11/16 11:59:31 INFO mapred.JobClient: FILE: Number of bytes read=4674 17/11/16 11:59:31 INFO mapred.JobClient: FILE: Number of bytes written=139416 17/11/16 11:59:31 INFO mapred.JobClient: FILE: Number of read operations=0 17/11/16 11:59:31 INFO mapred.JobClient:
FILE: Number of large read operations=0 17/11/16 11:59:31 INFO mapred.JobClient: FILE: Number of write operations=0 17/11/16 11:59:31 INFO mapred.JobClient: Map-Reduce Framework 17/11/16 11:59:31 INFO mapred.JobClient: Map input records=0 17/11/16 11:59:31 INFO mapred.JobClient: Map output records=0 17/11/16 11:59:31 INFO mapred.JobClient: Map output bytes=0 17/11/16 11:59:31 INFO mapred.JobClient: Input split bytes=125 17/11/16 11:59:31 INFO mapred.JobClient: Combine input records=0 17/11/16 11:59:31 INFO mapred.JobClient: Combine output records=0 17/11/16 11:59:31 INFO mapred.JobClient: Spilled Records=0 17/11/16 11:59:31 INFO mapred.JobClient: CPU time spent (ms)=0 17/11/16 11:59:31 INFO mapred.JobClient: Physical memory (bytes) snapshot=0 17/11/16 11:59:31 INFO mapred.JobClient: Virtual memory (bytes) snapshot=0 17/11/16 11:59:31 INFO mapred.JobClient: Total committed heap usage (bytes)=363618304


您已将这些类视为内部类,这可能会引起问题。内部类只能存在于enclosing class的实例中。


public class Friends {

    public static class FriendsMap extends Mapper <...> {}

    public static class FriendReducer extends Reducer <...> {}

    public static void main( String[] args) throws Exception { 
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "Friends");
        FileInputFormat.addInputPath(job, new Path("input")); 
        FileOutputFormat.setOutputPath(job, new Path("output")); 

        System.exit( job.waitForCompletion( true) ? 0 : 1); 

关于java - Hadoop中的MapReduce程序,实现了一个简单的 “People You Might Know”,我们在Stack Overflow上找到一个类似的问题:


sql - HIVE-QL 中的 LEAD 函数语法

java - 如何在 netbeans JFrame 编辑器中显示自定义对话框?

java - 如何在数组列表中删除这种情况,而不是仅仅检查它是否发生

apache-spark - 将数据框保存到.txt或.csv文件

hadoop - Mahout - 异常 : Java Heap space

hadoop - 无法在 Ambari 服务器中安装 Impala

hadoop - 如何在 Hadoop 中获取 JobTracker 的实例?

java - 使用 JDBC 在 Oracle 数据库上创建 Java

java - 斐波那契使用 1 个变量

java - 2 java代码的幂帮助我输入不同的结果