java - Hadoop:Eclipse 列表之外丢失数据

标签 java eclipse list hadoop mapreduce

我编写了一个简单的 MapReduce 作业(基于字数统计示例)来获取文本文件中的总字数。我逐行浏览文件,在映射之前我会进行一些处理。除了在映射之前从行中删除某些单词之外,所有这些似乎都有效。

在开始工作之前,我从文件中读取了单词列表,在映射行之前应将其删除。我让程序在读入后打印出单词列表,它工作得很好。 问题是:一旦作业开始,我包含单词的 ArrayList 似乎又变成空了。有趣的是,只有在 Eclipse 之外启动程序(jar 文件)时才会发生这种情况,在 Eclipse 中这些单词会被删除。 eclipse 之外的最终结果是 1320 万,尽管总共应该有 1340 万个单词(不从列表中删除单词)。在 eclipse 中,结果应该是 840 万。

这是为什么呢?非常感谢您的帮助!

这是我的代码:

import java.io.*;
import java.util.*; 

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.io.file.tfile.TFile.Reader.Scanner;
import org.apache.hadoop.mapred.*; 
import org.apache.hadoop.util.*; 

public class WordCount { 

    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,  NullWritable, IntWritable> { 

        private final static IntWritable one = new IntWritable(1); 
        private final static NullWritable nullKey = NullWritable.get();

        public void map(LongWritable key, Text value, OutputCollector< NullWritable, IntWritable> output, Reporter reporter) throws IOException { 

            String processedline = LineProcessor.processLine(value.toString());

            StringTokenizer tokenizer = new StringTokenizer(processedline); 
            while (tokenizer.hasMoreTokens()) { 
                tokenizer.nextToken();
                output.collect(nullKey, one); 
            } 
        }  

    } 

    public static class Reduce extends MapReduceBase implements Reducer<NullWritable, IntWritable, NullWritable, IntWritable> { 

        private final static NullWritable nullKey = NullWritable.get();

        public void reduce(NullWritable key, Iterator<IntWritable> values, OutputCollector<NullWritable, IntWritable> output, Reporter reporter) throws IOException { 

            int sum = 0; 
            while (values.hasNext()) { 
                sum += values.next().get(); 
            } 
            output.collect(nullKey, new IntWritable(sum)); 
        }

    } 

    public static class LineProcessor{
        public static ArrayList<String> stopWordsList = new ArrayList<String>();

        public static void initializeStopWords() throws IOException{
            Path stop_words = new Path("/user/ds2013/stop_words/english_stop_list.txt");
            FileSystem fs = FileSystem.get(new Configuration());
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(stop_words)));
            String stopWord;
            stopWord = br.readLine();

            while (stopWord != null){
                //addToStopWords
                stopWordsList.add(stopWord);
                stopWord = br.readLine();
            }
        }

        public static String processLine(String line) {
            line = line.toLowerCase();
            //delete some punctuation
            char[] remove = {'.', ',','"'};
            for (char c : remove) {
                line = line.replace(""+c, "");
            }
            //Replace "-" with Space
            line = line.replace("-", " ");

            //delete stop Words
            StringTokenizer tokenizer = new StringTokenizer(line); 
            String nextWord = tokenizer.nextToken();
            while (tokenizer.hasMoreTokens()) {     
                if(stopWordsList.contains(nextWord)){
                    line = line.replace(nextWord, "");
                }
                nextWord = tokenizer.nextToken();
            } 

            return line;
        }
    }

    public static void main(String[] args) throws Exception { 
        JobConf conf = new JobConf(WordCount.class); 
        conf.setJobName("wordcount"); 
        conf.setMapOutputKeyClass(NullWritable.class);
        conf.setMapOutputValueClass(IntWritable.class);
        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class); 
        conf.setCombinerClass(Reduce.class); 
        conf.setReducerClass(Reduce.class); 

        conf.setInputFormat(TextInputFormat.class); 
        conf.setOutputFormat(TextOutputFormat.class);
        //initialize List of words that should be deletet
        LineProcessor.initializeStopWords();

        //Directories

        FileInputFormat.setInputPaths(conf, new Path("/user/ds2013/data/plot_summaries.txt"));


        Path outputDir = new Path( args[0] );
        //delete output folder if it already exists
        FileSystem fs = FileSystem.get(conf);
        fs.delete(outputDir, true);
        FileOutputFormat.setOutputPath(conf, outputDir);


        JobClient.runJob(conf); 

    } 
}

最佳答案

如果您通过命令行提交作业,它将为此创建一个客户端进程。所以你在 main 方法中进行的初始化:

LineProcessor.initializeStopWords();

正在一个完全不同的进程中运行。通常,您可以将此 init 内容移至映射器中的设置函数中,您可以覆盖该函数(在您使用的旧 API 中):

public void configure(JobConf job) {
   LineProcessor.initializeStopWords();
}

或者在较新的 API 中是:

public void setup(Context context) {
   LineProcessor.initializeStopWords();
}

关于java - Hadoop:Eclipse 列表之外丢失数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20116170/

相关文章:

eclipse - 如何在 org.eclipse.swt.widgets.Combo 中禁用鼠标滚轮事件

android - 如何将对象附加到多维列表

Eclipse - 格式化 Switch 语句

java - Eclipse中奇怪的 "java.lang.NoClassDefFoundError"

python - 多个矩阵的输入文件

python - 根据列表索引在 for 循环中拆分列表

java - 错误未知失败(在 android.os.Binder.execTransact(Binder.java :565)) Error while Installing APKs

java - 如何在 java 中创建 .dat 文件

java - 在 Canvas 上绘制可跨越的编辑文本内容

java - JavaFX ObservableList 的回调和提取器