hadoop - 使用MultipleInputs的Hbase MapReduce作业:无法将LongWritable强制转换为ImmutableBytesWritable

标签 hadoop mapreduce hbase

我正在写一个将HBase表作为输入并将其转储到HDFS文件的MR作业。我打算使用MultipleInputs类(来自Hadoop),因为我计划采用多个数据源。我编写了一个非常简单的MR程序(请参见下面的源代码)。不幸的是,我遇到以下错误:

java.lang.ClassCastException:org.apache.hadoop.io.LongWritable无法转换为org.apache.hadoop.hbase.io.ImmutableBytesWritable

我在伪分布式hadoop(1.2.0)和伪分布式HBase(0.95.1-hadoop1)上运行。

这是完整的源代码:有趣的是:如果我注释掉了multiinputs行“MultipleInputs.addInputPath(job,inputPath1,TextInputFormat.class,TableMap.class);”,则MR作业运行良好。

public class MixMR {

public static class TableMap extends TableMapper<Text, Text>  {
    public static final byte[] CF = "cf".getBytes();
    public static final byte[] ATTR1 = "c1".getBytes();

    public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {

        String key = Bytes.toString(row.get());
        String val = new String(value.getValue(CF, ATTR1));

        context.write(new Text(key), new Text(val));
    }
}


public static class Reduce extends Reducer  <Object, Text, Object, Text> {
    public void reduce(Object key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String ks = key.toString();
        for (Text val : values){
            context.write(new Text(ks), val);
        }

    }
}

public static void main(String[] args) throws Exception {
    Path inputPath1 = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    String tableName1 = "test";

    Configuration config = HBaseConfiguration.create();
    Job job = new Job(config, "ExampleRead");
    job.setJarByClass(MixMR.class);     // class that contains mapper


    Scan scan = new Scan();
    scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
    scan.setCacheBlocks(false);  // don't set to true for MR jobs
    scan.addFamily(Bytes.toBytes("cf"));

    TableMapReduceUtil.initTableMapperJob(
            tableName1,        // input HBase table name
              scan,             // Scan instance to control CF and attribute selection
              TableMap.class,   // mapper
              Text.class,             // mapper output key
              Text.class,             // mapper output value
              job);
    job.setReducerClass(Reduce.class);    // reducer class
    job.setOutputFormatClass(TextOutputFormat.class);  

    // inputPath1 here has no effect for HBase table
    MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class);

    FileOutputFormat.setOutputPath(job, outputPath);

    job.waitForCompletion(true);
}

}

最佳答案

我得到了答案:
在以下语句中:将TextInputFormat.class替换为TableInputFormat.class

MultipleInputs.addInputPath(job,inputPath1,TextInputFormat.class,TableMap.class);

关于hadoop - 使用MultipleInputs的Hbase MapReduce作业:无法将LongWritable强制转换为ImmutableBytesWritable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17604942/

相关文章:

algorithm - 寻找长度为 k 的路径

hadoop - 将一张表的数据从HBase 0.94复制到HBase 0.98

hadoop - 映射器功能键

hadoop - Hadoop减少个人记录数

java - Hadoop 反序列化不适用于列表

hadoop - 跨映射器共享内存数据中的公共(public)查找 : Hadoop

java - 无法加载 Storm 的 JdbcInsertBolt 的 Phoenix JDBC 驱动程序

hadoop - HBase 和 Hadoop/HDFS 的区别

hadoop - Hadoop 2.6多节点集群

hadoop - Hiveserver2 未启动