caching - 读取文件内容时出错-MapReduce

标签 caching hadoop nullpointerexception mapreduce

读取已作为参数传递给程序的文件内容并将其显示在控制台中是我程序的一部分。当我阅读以读取文件的内容时,出现错误。当我省略setup()并运行程序时,它工作正常。但是我想显示文件的内容。下面是我的代码。

package search;

import java.io.BufferedReader;
   import java.io.File;
    import java.io.FileReader;
    import java.io.IOException;
    import java.text.Format;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.StringTokenizer;

import org.apache.commons.httpclient.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;



public class search {



    public static class SearchMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        // Map code goes here.
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void setup(Context context) throws IOException, InterruptedException{
            Configuration conf = context.getConfiguration();
            java.net.URI[] localPaths = context.getCacheFiles();
            BufferedReader reader = null;

            try {
                File file = new File(localPaths[0]);
                reader = new BufferedReader(new FileReader(file));

                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                }

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }


        } 

        public void map (LongWritable Key, Text value,Context context )throws IOException,InterruptedException{

            String txt= value.toString();

                word = context.getCurrentValue();
                context.getCurrentKey();
                word.set(txt);
                context.write(word, one);

    }
    }

    public static class SearchReducer extends
            Reducer<Text, IntWritable,  Text, IntWritable> {
        // Reduce code goes here.

         private IntWritable result = new IntWritable();
            public void reduce(Text key, Iterable<IntWritable> values,
             Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                   sum += val.get();
               }
               result.set(sum);
                context.write(key,result);
            }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args)
                .getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: SearchCounter <in> <out>");
            System.exit(2);
        }

        Job job = Job.getInstance(new Configuration());
        job.addCacheFile(new Path("/Users/praveen/input/").toUri());
        job =new Job(conf);

        job.setJarByClass(search.class);
        job.setMapperClass(SearchMapper.class);
        job.setCombinerClass(SearchReducer.class);
        job.setReducerClass(SearchReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }

我收到以下错误:
java.lang.Exception: java.lang.NullPointerException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:403)
Caused by: java.lang.NullPointerException
    at search.search$SearchMapper.setup(search.java:59)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:339)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)
2015-06-06 17:00:42,907 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1338)) - Job job_local1148982887_0001 running in uber mode : false
2015-06-06 17:00:42,909 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1345)) -  map 0% reduce 0%
2015-06-06 17:00:42,912 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1358)) - Job job_local1148982887_0001 failed with state FAILED due to: NA
2015-06-06 17:00:42,921 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1363)) - Counters: 0

请指出这里有什么问题。

最佳答案

为什么在将缓存文件添加到作业后重新创建作业?尝试删除此行:

    job =new Job(conf);

关于caching - 读取文件内容时出错-MapReduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30684960/

相关文章:

c# - 在 Web API 使用的域项目中实现服务缓存

php - 如何防止 css 文件中的图像被缓存?

c# - Stackexchange Redis 哨兵客户端

javascript - CakePHP - 加载时javascript文件不更新

java - 使用Hadoop 3.0.0-alpha2无法正确添加/获取缓存文件

java - 为什么我的 Spring @Autowired 字段为空?

java - 如何减少ArrayList未使用的容量

macos - Hadoop2.7.3 : Cannot see DataNode/ResourceManager process after starting hdfs and yarn

hadoop - 来自 HTable 的 MapReduce 输入

java - Eclipse 建议空指针异常,但我相信我初始化了我的对象