java - Hbase 映射减少 : how to use custom class as value for the mapper and/or reducer?

标签 java hadoop mapreduce hbase

我正在尝试熟悉 Hadoop/Hbase MapReduce 作业,以便能够正确编写它们。现在我有一个 Hbase 实例,其中包含一个名为 dns 的表,其中包含一些 DNS 记录。我试图制作一个简单的唯一域计数器来输出文件并且它有效。现在,我只使用 IntWritableText,我想知道是否可以为我的 Mapper/Reducer 使用自定义对象。我试着自己做,但我得到了

Error: java.io.IOException: Initialization of all the collectors failed. Error in last collector was :null
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:415)
    at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:698)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:770)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:170)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:164)
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1011)
    at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:402)
    ... 9 more

由于我是新手,所以我实际上不知道该怎么做。我猜我必须实现一个或多个接口(interface)或扩展一个抽象类,但我找不到 here或互联网上的适当示例。

我试图从我的 dns 表中创建一个简单的域计数器,但使用类作为整数的包装器(仅用于教学目的)。我的 map 类如下所示:

public class Map extends TableMapper<Text, MapperOutputValue> {
    private static byte[] columnName = "fqdn".getBytes();
    private static byte[] columnFamily = "d".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context)
            throws InterruptedException, IOException {

        String fqdn = new String(value.getValue(columnFamily, columnName));
        Text key = new Text();
        key.set(fqdn);
        context.write(key, new MapperOutputValue(1));

    }
}

Reducer:

public class Reduce extends Reducer<Text, MapperOutputValue, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<MapperOutputValue> values, Context context)
            throws IOException, InterruptedException {

        int i = 0;
        for (MapperOutputValue val : values) {
            i += val.getCount();
        }

        context.write(key, new IntWritable(i));
    }
}

还有我的 Driver/Main 函数的一部分:

 TableMapReduceUtil.initTableMapperJob(
                "dns",
                scan,
                Map.class,
                Text.class,
                MapperOutputValue.class,
                job);

/* Set output parameters */
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);

正如我所说,MapperOutputValue 只是一个简单的类,它包含一个私有(private) Integer、一个带参数的构造函数、一个 getter 和一个 setter。我也尝试添加一个 toString 方法,但它仍然不起作用。

所以我的问题是:使用自定义类作为映射器输出/reducer 输入的最佳方法是什么?另外,假设我想使用一个具有多个字段的类作为 reducer 的最终输出。这个类应该实现/扩展什么?这是个好主意还是我应该坚持使用“基元”作为 IntWritable 或 Text?

谢谢!

最佳答案

MapOutputValue 应该实现 Writable,以便它可以在 MapReduce 作业中的任务之间序列化。将 MapOutputJob 替换为以下应该有效:

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class DomainCountWritable implements Writable {
    private Text domain;
    private IntWritable count;

    public DomainCountWritable() {
        this.domain = new Text();
        this.count = new IntWritable(0);
    }

    public DomainCountWritable(Text domain, IntWritable count) {
        this.domain = domain;
        this.count = count;
    }

    public Text getDomain() {
        return this.domain;
    }

    public IntWritable getCount() {
        return this.count;
    }

    public void setDomain(Text domain) {
        this.domain = domain;
    }

    public void setCount(IntWritable count) {
        this.count = count;
    }

    public void readFields(DataInput in) throws IOException {
        this.domain.readFields(in);
        this.count.readFields(in);
    }

    public void write(DataOutput out) throws IOException {
        this.domain.write(out);
        this.count.write(out);
    }

    @Override
    public String toString() {
        return this.domain.toString() + "\t" + this.count.toString();
    }
}

关于java - Hbase 映射减少 : how to use custom class as value for the mapper and/or reducer?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55531180/

相关文章:

java - volatile 关键字原子性

java - 直接遍历 JAR 文件中的文件夹

python - Mrjob 无法在 dataproc 上创建集群 : __init__() got an unexpected keyword argument 'channel'

java - 为什么它打印两次?如果别的

java - 无法解析类型 org.eclipse.core.runtime.IConfigurationElement

apache-spark - 如何配置Apache Spark 2.4.5连接到HIVE的MySQL元存储库?

amazon-web-services - 尝试在 EMR 上安装 Spark 时引导失败

hadoop - 每个 map 的任务运行与mapred-site.xml中配置的任务的差异

hadoop - 如何在Apache Crunch中进行Map端全外部联接(MapsideJoinStrategy不支持的联接类型FULL_OUTER_JOIN)

hadoop - 将文件拆分为 80% 和 20% 以在 MapReduce 中构建模型和预测的更好方法