hadoop - 在 Mapper 类中检索到的键和值的空白/空值

标签 hadoop mapreduce cloudera

我已经编写了一个 MapReduce 代码用于在 CDH4 集群上运行它。我的要求是读取完整的文件作为值,文件名作为键。为此,我编写了自定义 InputFormat 和 RecordReader 类。

自定义输入格式类:FullFileInputFormat.java

import java.io.*;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

import FullFileRecordReader;

public class FullFileInputFormat extends FileInputFormat<Text, Text> {

    @Override
    public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf jobConf, Reporter reporter) throws IOException {
        reporter.setStatus(split.toString());
        return new FullFileRecordReader((FileSplit) split, jobConf);
    }
}

自定义 RecordReader 类:FullFileRecordReader.java

import java.io.BufferedReader;
import java.io.IOException;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;

public class FullFileRecordReader implements RecordReader<Text, Text> {

    private BufferedReader in;
    private boolean processed = false;
    private int processedBytes = 0;

    private FileSplit fileSplit;
    private JobConf conf;

    public FullFileRecordReader(FileSplit fileSplit, JobConf conf) {
        this.fileSplit = fileSplit;
        this.conf = conf;
    }

    @Override
    public void close() throws IOException {
        if (in != null) {
            in.close();
        }
    }

    @Override
    public Text createKey() {
        return new Text("");
    }

    @Override
    public Text createValue() {
        return new Text("");
    }

    @Override
    public long getPos() throws IOException {
        return processedBytes;
    }

    @Override
    public boolean next(Text key, Text value) throws IOException {
        Path filePath = fileSplit.getPath();

        if (!processed) {
            key = new Text(filePath.getName());

            value = new Text("");
            FileSystem fs = filePath.getFileSystem(conf);
            FSDataInputStream fileIn = fs.open(filePath);
            byte[] b = new byte[1024];
            int numBytes = 0;

            while ((numBytes = fileIn.read(b)) > 0) {
                value.append(b, 0, numBytes);
                processedBytes += numBytes;
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public float getProgress() throws IOException {
        return 0;
    }
}

虽然每当我尝试在 RecordReader 类中打印键值时,我都会得到它们的值,但是当我在映射器类中打印相同的值时,我会看到它们的空白值。我无法理解为什么 Mapper 类无法获取键和值的任何数据。

目前我只有一个 Map 作业,没有 reduce 作业。代码是:

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.*;

import FullFileInputFormat;

public class Source {

    public static class Map extends MapReduceBase implements Mapper<Text, Text, Text, Text> {

        public void map(Text key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws java.io.IOException {
            System.out.println("Processing " + key.toString());
            System.out.println("Value: " + value.toString());
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf job = new JobConf(Source.class);
        job.setJobName("Source");

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setJarByClass(Source.class);
        job.setInputFormat(FullFileInputFormat.class);
        job.setMapperClass(Map.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        JobClient.runJob(job);
    }
}

最佳答案

您将在下一个方法中创建新实例 - hadoop 重新使用对象,因此您需要填充传递的对象。应该很简单,修改如下:

@Override
public boolean next(Text key, Text value) throws IOException {
    Path filePath = fileSplit.getPath();

    if (!processed) {
        // key = new Text(filePath.getName());
        key.set(filePath.getName());

        // value = new Text("");
        value.clear();
    }

我还建议预先调整值文本的大小,以避免值的底层字节数组“成长”的痛苦。 Text 有一个名为 setCapacity 的私有(private)方法,所以不幸的是你不能调用它 - 但如果你使用 BytesWritable 来缓冲文件输入,你可以在你的下一个方法中调用 setCapacity,传递 fileSplit 长度(注意这可能仍然是错误的如果您的文件被压缩 - 因为文件大小是压缩后的大小)。

关于hadoop - 在 Mapper 类中检索到的键和值的空白/空值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14231658/

相关文章:

hadoop - No nodes available to query 错误

javascript - mongodb 中的加密哈希函数

Hadoop Mapper 运行缓慢

hadoop - Lily Hbase Indexers 无故退出

sql - 加入Apache Hive 0.14更新和删除查询

mongodb - 数据记录和NoSQL

hadoop - 无法格式化Cantos java.lang.Internal Error中的名称节点

java - HBase MapReduce中的Nullpointer异常

hadoop - hadoop节点未用于 map task

java - Oozie-将jar文件复制到Cloudera上的lib文件夹时被截断