java - 调用 InputSplit 的 getClass() 时来自 Hadoop 的 JobSplitWriter/SerializationFactory 的 NullPointerException

标签 java hadoop nullpointerexception mapreduce

我得到一个 NullPointerException启动 MapReduce 时工作。它被 SerializationFactory 抛出的 getSerializer()方法。我正在使用自定义 InputSplit , InputFormat , RecordReaderMapReduce值类。

我知道在我的 InputFormat 创建拆分后一段时间后会抛出错误类,但在创建 RecordReader 之前.据我所知,它是在“清理暂存区”消息之后直接发生的。

通过检查堆栈跟踪指示的位置中的 Hadoop 源,看起来错误发生在 getSerialization() 时。收到空值 Class<T>指针。 JobClient 的 writeNewSplits()像这样调用该方法:

Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());

因此,我假设当getClass()正在按我的习惯要求 InputSplit对象,它返回一个 null指针,但这只是莫名其妙。有什么想法吗?

错误的完整堆栈跟踪如下:

12/06/24 14:26:49 INFO mapred.JobClient: Cleaning up the staging area hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035
Exception in thread "main" java.lang.NullPointerException
    at org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73)
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits(JobSplitWriter.java:123)
    at org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles(JobSplitWriter.java:74)
    at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:968)
    at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:979)
    at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897)
    at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
    at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:500)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:530)
    at edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start(CuratorJob.java:94)
    at edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main(HadoopInterface.java:58)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:156)

Thanks!

EDIT: My code for the custom InputSplit follows:

import . . .

/**
 * A document directory within the input directory. 
 * Returned by DirectoryInputFormat.getSplits()
 * and passed to DirectoryInputFormat.createRecordReader().
 *
 * Represents the data to be processed by an individual Map process.
 */
public class DirectorySplit extends InputSplit {
    /**
     * Constructs a DirectorySplit object
     * @param docDirectoryInHDFS The location (in HDFS) of this
     *            document's directory, complete with all annotations.
     * @param fs The filesystem associated with this job
     */
    public  DirectorySplit( Path docDirectoryInHDFS, FileSystem fs )
            throws IOException {
        this.inputPath = docDirectoryInHDFS;
        hash = FileSystemHandler.getFileNameFromPath(inputPath);
        this.fs = fs;
    }

    /**
     * Get the size of the split so that the input splits can be sorted by size.
     * Here, we calculate the size to be the number of bytes in the original
     * document (i.e., ignoring all annotations).
     *
     * @return The number of characters in the original document
     */
    @Override
    public long getLength() throws IOException, InterruptedException {
        Path origTxt = new Path( inputPath, "original.txt" );
        HadoopInterface.logger.log( msg );
        return FileSystemHandler.getFileSizeInBytes( origTxt, fs);
    }

    /**
     * Get the list of nodes where the data for this split would be local.
     * This list includes all nodes that contain any of the required data---it's
     * up to Hadoop to decide which one to use.
     *
     * @return An array of the nodes for whom the split is local
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public String[] getLocations() throws IOException, InterruptedException {
        FileStatus status = fs.getFileStatus(inputPath);

        BlockLocation[] blockLocs = fs.getFileBlockLocations( status, 0,
                                                              status.getLen() );

        HashSet<String> allBlockHosts = new HashSet<String>();
        for( BlockLocation blockLoc : blockLocs ) {
            allBlockHosts.addAll( Arrays.asList( blockLoc.getHosts() ) );
        }

        return (String[])allBlockHosts.toArray();
    }

    /**
     * @return The hash of the document that this split handles
     */
    public String toString() {
        return hash;
    }

    private Path inputPath;
    private String hash;
    private FileSystem fs;
}

最佳答案

InputSplit 不扩展 Writable,您需要明确声明您的输入拆分实现 Writable

关于java - 调用 InputSplit 的 getClass() 时来自 Hadoop 的 JobSplitWriter/SerializationFactory 的 NullPointerException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/11180776/

相关文章:

java - 从另一个类调用 JFrame

java - 我可以使用Cucumber测试使用多种语言的应用程序吗?

java - 使用 log4j 的多个库

java - Android:Fragment中的sharedPreference,不从Class中读取数据

hadoop - 压缩RC表 hive

hadoop - 如何将数据加载到HBase

java - 使用 Java 编译器 API 时出现空指针异常

php - 使用 php 和 android 连接到 mysql 时出现 NullPointerException

java - Android:将数据(附加)传递给 fragment

java - 当没有符号的 0 等于并且更普遍被认为是正确的时候,为什么 Java 在 0 前面打印一个负号?