java - Hadoop:在映射器的输出中使用自定义对象

标签 java hadoop mapreduce

我是 Hadoop 新手,并且被一些事情难住了:

我想做的是获取文件中的文本条目列表,并让初始映射器对它们进行一些处理,然后输出要由reducer聚合的自定义对象。

我使用所有文本值构建了一个框架,但当我尝试更改为使用我们自己的对象时,我得到了一个 NPE(如下所示)

这是驱动程序的 run():

JobConf conf = new JobConf( getConf(), VectorConPreprocessor.class );
conf.setJobName( JOB_NAME + " - " + JOB_ISODATE );           
m_log.info("JOB NAME:  " + conf.getJobName() );

// Probably need to change this to be a chain-mapper later on . . . . 

conf.setInputFormat(  TextInputFormat.class          );    // reading text from files

conf.setMapperClass(         MapMVandSamples.class  );
conf.setMapOutputValueClass( SparsenessFilter.class );

//conf.setCombinerClass( CombineSparsenessTrackers.class );  // not using combiner, because ALL nodes must be gathered before reduction     
conf.setReducerClass(  ReduceSparsenessTrackers.class  );    // not sure reducing is required here . . . . 

conf.setOutputKeyClass(   Text.class );    // output key will be the SHA2
conf.setOutputValueClass( Text.class );    // output value will be the FeatureVectorMap
conf.setOutputFormat(     SequenceFileOutputFormat.class );    // binary object writer          

这是映射器:

public class MapMVandSamples extends MapReduceBase implements Mapper<LongWritable, Text, Text, SparsenessFilter> 
{

    public static final String DELIM = ":";
    protected static Logger m_log    = Logger.getLogger( MapMVandSamples.class );    

    // In this case we're reading a line of text at a time from the file
    // We don't really care about the SHA256 for now, just create a SparsenessFilter
    //   for each entry.  The reducer will aggregate them later.
    @Override
    public void map( LongWritable bytePosition, Text lineOfText, OutputCollector<Text, SparsenessFilter> outputCollector, Reporter reporter ) throws IOException
    {                
        String[] data = lineOfText.toString().split( DELIM, 2 );
        String sha256 = data[0];
        String json   = data[1];

        // create a SparsenessFilter for this record
        SparsenessFilter sf = new SparsenessFilter();
        // crunching goes here

        outputCollector.collect( new Text("AllOneForNow"), sf );    
    }

}

最后,错误:

14/03/05 21:56:56 INFO mapreduce.Job: Task Id : attempt_1394084907462_0002_m_000000_1, Status : FAILED
Error: java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:989)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:390)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:418)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:162)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1491)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:157)

有什么想法吗? 我是否需要在 SparsenessFilter 上实现一个接口(interface)才能让 Mapper 的 OutputCollector 处理它?<​​/p>

谢谢!

最佳答案

所有自定义键和值都应实现 WritableComparable 接口(interface)。

您需要实现 readFields(DataInput in) 和 write(DataOutput out) 以及 CompareTo。

Example

关于java - Hadoop:在映射器的输出中使用自定义对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22216771/

相关文章:

java - 为什么我的 JSlider 没有改变值?

JavaFX 一张一张显示多个gif

java - Java 中的 AFOAuth2Client 等价物

hadoop - Hive 无法识别 hbase 中的数字类型值

java - 无法从 MapReduce 代码访问 HBase

java - 无法使用 powermock 子类化最终类

java - 如何从远程集群上的本地IDE运行MapReduce程序

hadoop - 使用 Cloudera Manager 安装 CDH : No such file or directory

hadoop - 从映射器输出中获取前 N 个项目 - Mapreduce

hadoop - hadoop流作业无法报告?