hadoop - HBase MR-键/值不匹配

标签 hadoop mapreduce hbase

我正在尝试通过独立的HBase(0.94.11)执行MR代码。

我已经阅读了HBase api,并修改了MR代码以读取数据并向HBase表写入结果,并且在reduce阶段遇到异常。提供部分代码(不包括业务逻辑)

SentimentCalculatorHBase-工具/主类:

package com.hbase.mapreduce;

import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class SentimentCalculatorHBase extends Configured implements Tool {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // TODO Auto-generated method stub
        SentimentCalculatorHBase sentimentCalculatorHBase = new SentimentCalculatorHBase();
        ToolRunner.run(sentimentCalculatorHBase, args);
    }

    @Override
    public int run(String[] arg0) throws Exception {
        // TODO Auto-generated method stub


        System.out
                .println("***********************Configuration started***********************");
        Configuration configuration = getConf();
        System.out.println("Conf: " + configuration);


        Job sentiCalcJob = new Job(configuration, "HBase SentimentCalculation");

        sentiCalcJob.setJarByClass(SentimentCalculatorHBase.class);
        sentiCalcJob.setMapperClass(SentimentCalculationHBaseMapper.class);
        sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);
        sentiCalcJob.setReducerClass(SentimentCalculationHBaseReducer.class);


        sentiCalcJob.setInputFormatClass(TableInputFormat.class);
        sentiCalcJob.setOutputFormatClass(TableOutputFormat.class);

        /* Start : Added out of exasperation! */
        sentiCalcJob.setOutputKeyClass(ImmutableBytesWritable.class);
        sentiCalcJob.setOutputValueClass(Put.class);
        /* End : Added out of exasperation! */

        Scan twitterdataUserScan = new Scan();
        twitterdataUserScan.setCaching(500);

        twitterdataUserScan.addColumn("word_attributes".getBytes(),
                "TwitterText".getBytes());

        TableMapReduceUtil.initTableMapperJob("twitterdata_user",
                twitterdataUserScan, SentimentCalculationHBaseMapper.class,
                Text.class, Text.class, sentiCalcJob);

        TableMapReduceUtil.initTableReducerJob("sentiment_output",
                SentimentCalculationHBaseReducer.class, sentiCalcJob);

        Calendar beforeJob = Calendar.getInstance();
        System.out.println("Job Time started---------------- "
                + beforeJob.getTime());
        boolean check = sentiCalcJob.waitForCompletion(true);
        if (check == true) {
            System.out
                    .println("*******************Job completed- SentimentCalculation********************");
        }
        Calendar afterJob = Calendar.getInstance();
        System.out
                .println("Job Time ended SentimentCalculation---------------- "
                        + afterJob.getTime());
        return 0;
    }
}

映射器类:
public class SentimentCalculationHBaseMapper extends TableMapper<Text, Text> {

private Text sentenseOriginal = new Text();
private Text sentenseParsed = new Text();

@Override
    protected void map(
            ImmutableBytesWritable key,
            Result value,
            org.apache.hadoop.mapreduce.Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
            throws IOException, InterruptedException {
context.write(this.sentenseOriginal, this.sentenseParsed);
}
}

reducer :
public class SentimentCalculationHBaseReducer extends
        TableReducer<Text, Text, ImmutableBytesWritable> {

@Override
    protected void reduce(
            Text key,
            java.lang.Iterable<Text> values,
            org.apache.hadoop.mapreduce.Reducer<Text, Text, ImmutableBytesWritable, org.apache.hadoop.io.Writable>.Context context)
            throws IOException, InterruptedException {

Double mdblSentimentOverall = 0.0;


String d3 = key + "@12321@" + s11.replaceFirst(":::", "")
                    + "@12321@" + mstpositiveWords + "@12321@"
                    + mstnegativeWords + "@12321@" + mstneutralWords;

            System.out.println("d3 : " + d3 + " , mdblSentimentOverall : "
                    + mdblSentimentOverall);

            Put put = new Put(d3.getBytes());

            put.add(Bytes.toBytes("word_attributes"),
                    Bytes.toBytes("mdblSentimentOverall"),
                    Bytes.toBytes(mdblSentimentOverall));

            System.out.println("Context is " + context);

            context.write(new ImmutableBytesWritable(d3.getBytes()), put);
}
}

我得到的异常(exception)是:
13/09/05 16:16:17 INFO mapred.JobClient:  map 0% reduce 0%
13/09/05 16:23:31 INFO mapred.JobClient: Task Id : attempt_201309051437_0005_m_000000_0, Status : FAILED
java.io.IOException: wrong key class: class org.apache.hadoop.hbase.io.ImmutableBytesWritable is not class org.apache.hadoop.io.Text
        at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:164)
        at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1168)
        at org.apache.hadoop.mapred.Task$NewCombinerRunner$OutputConverter.write(Task.java:1492)
        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:199)
        at com.hbase.mapreduce.SentimentCalculationHBaseReducer.reduce(SentimentCalculationHBaseReducer.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1513)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1436)
        at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1298)
        at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:699)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:766)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:370)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1149)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)

为了解决这个问题,我更改了reducer的签名:
public class SentimentCalculationHBaseReducer extends
        TableReducer<Text, Text, Text>{

@Override
    protected void reduce(
            Text key,
            java.lang.Iterable<Text> values,
            org.apache.hadoop.mapreduce.Reducer<Text, Text, Text, org.apache.hadoop.io.Writable>.Context context)
            throws IOException, InterruptedException {

context.write(new Text(d3.getBytes()), put);
}

}

但是然后我得到了一个错误,这次是值:
13/09/05 15:55:20 INFO mapred.JobClient: Task Id : attempt_201309051437_0004_m_000000_0, Status : FAILED
java.io.IOException: wrong value class: class org.apache.hadoop.hbase.client.Put is not class org.apache.hadoop.io.Text

无法找出与HBase MR API相抵触的地方!

最佳答案

仅作记录,对于任何好奇的人,我都有完全相同的问题,我的解决方案是删除定义合并器功能的以下行:

sentiCalcJob.setCombinerClass(SentimentCalculationHBaseReducer.class);

该错误似乎是由于Mapper尝试运行reduce类而导致的(如Reducer.run在xxxx_m任务(表示map任务)中被调用所指出的),从而导致不匹配失败。

删除Combinerclass定义对我来说解决了这个问题。

关于hadoop - HBase MR-键/值不匹配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18628542/

相关文章:

Hbase shell - 如何写入字节值

hbase - Hbase Cell Version的使用

sql - 如何使用rank函数获取hive中的最新记录

hadoop - 映射器内组合以及映射器的清理例程何时执行?

hadoop - 机器学习训练中的map-reduce/hadoop来自何处?

hadoop链映射/减少

scala - Spark 结构化流与 Hbase 集成

hadoop - 数据节点和节点管理器出现在吉普车中,但无法正确启动

Python 和 MapReduce : beyond basics -- how to do more tasks on one database

java - 为什么 IdentityMapper 在 org.apache.hadoop.mapreduce 库中消失了?