java - Hadoop 2.4.0 + HCatalog + Mapreduce

标签 java hadoop mapreduce hcatalog

在 Hadoop 2.4.0 中,执行以下代码示例时出现以下错误。我认为,有不匹配的hadoop版本。你在审查代码吗?以及如何修复此代码?

我正在尝试编写复制 Hcatalog 表的 map-reduce 作业。

谢谢你。

Exception in thread "main" java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getJobInfo(HCatBaseOutputFormat.java:94)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:82)
at org.apache.hcatalog.mapreduce.HCatBaseOutputFormat.checkOutputSpecs(HCatBaseOutputFormat.java:72)
at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:458)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:343)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at org.deneme.hadoop.UseHCat.run(UseHCat.java:102)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
at org.deneme.hadoop.UseHCat.main(UseHCat.java:107)

代码示例
public class UseHCat extends Configured implements Tool{
public static class Map extends Mapper<WritableComparable, HCatRecord,Text,IntWritable>     {
    String groupname;

    @Override
  protected void map( WritableComparable key,
                      HCatRecord value,
                      org.apache.hadoop.mapreduce.Mapper<WritableComparable, HCatRecord,
                      Text, IntWritable>.Context context)
        throws IOException, InterruptedException {
        // The group table from /etc/group has name, 'x', id
        groupname = (String) value.get(0);
        int id = (Integer) value.get(2);
        // Just select and emit the name and ID
        context.write(new Text(groupname), new IntWritable(id));
    }
}

public static class Reduce extends Reducer<Text, IntWritable,
                                   WritableComparable, HCatRecord> {

    protected void reduce( Text key,
                           java.lang.Iterable<IntWritable> values,
                           org.apache.hadoop.mapreduce.Reducer<Text, IntWritable,
                           WritableComparable, HCatRecord>.Context context)
        throws IOException, InterruptedException {
        // Only expecting one ID per group name
        Iterator<IntWritable> iter = values.iterator();
        IntWritable iw = iter.next();
        int id = iw.get();
        // Emit the group name and ID as a record
        HCatRecord record = new DefaultHCatRecord(2);
        record.set(0, key.toString());
        record.set(1, id);
        context.write(null, record);
    }
}

public int run(String[] args) throws Exception {
    Configuration conf = getConf(); //hdfs://sandbox.hortonworks.com:8020
    //conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020");
    //conf.set("mapreduce.job.tracker", "192.168.1.115:50001");
    //Configuration conf = new Configuration();
    //conf.set("fs.defaultFS", "hdfs://192.168.1.198:8020/data");
    args = new GenericOptionsParser(conf, args).getRemainingArgs();

    // Get the input and output table names as arguments
    String inputTableName = args[0];
    String outputTableName = args[1];
    // Assume the default database
    String dbName = null;


    String jobName = "UseHCat";
    String userChosenName = getConf().get(JobContext.JOB_NAME);
    if (userChosenName != null)
      jobName += ": " + userChosenName;
    Job job = Job.getInstance(getConf());
    job.setJobName(jobName);

//        Job job = new Job(conf, "UseHCat");
//        HCatInputFormat.setInput(job, InputJobInfo.create(dbName,inputTableName, null));

    HCatInputFormat.setInput(job, dbName, inputTableName);

    job.setJarByClass(UseHCat.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    // An HCatalog record as input
    job.setInputFormatClass(HCatInputFormat.class);

    // Mapper emits a string as key and an integer as value
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    // Ignore the key for the reducer output; emitting an HCatalog record as value
    job.setOutputKeyClass(WritableComparable.class);
    job.setOutputValueClass(DefaultHCatRecord.class);
    job.setOutputFormatClass(HCatOutputFormat.class);

    HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
    HCatSchema s = HCatOutputFormat.getTableSchema(job.getConfiguration());
    System.err.println("INFO: output schema explicitly set for writing:" + s);
    HCatOutputFormat.setSchema(job, s);
    return (job.waitForCompletion(true) ? 0 : 1);
}

public static void main(String[] args) throws Exception {
//      System.setProperty("hadoop.home.dir", "C:"+File.separator+"hadoop-2.4.0");
    int exitCode = ToolRunner.run(new UseHCat(), args);
    System.exit(exitCode);
}

}

最佳答案

在 Hadoop 1.x.x 中,JobContext 是一个类,而在 Hadoop 2.x.x 中,它是一个接口(interface),并且 HCatalog-core API 与 hadoop 2.x.x 不兼容。

HCatalogBaseOutputFormat 类需要更改以下代码来解决问题:

//JobContext ctx = new JobContext(conf,jobContext.getJobID());
JobContext ctx = new Job(conf);

关于java - Hadoop 2.4.0 + HCatalog + Mapreduce,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24926927/

相关文章:

hadoop - 如何在 Pig 中使用 MapReduce Native 传递命令行参数

hadoop - 如何在Hadoop主程序中访问reducer输出的值(或键)?

Hadoop 作业一直在运行,没有分配容器

hadoop - 如何在Pig的组函数中使用Bincode运算符

java - 在 hibernate 中使用复合键

java - 最佳实践/性能 : mixing StringBuilder. 附加 String.concat

java - 属性引用异常 : No property ids found for type

hadoop - rmadmin未连接到hadoop yarn 中(基于标签的调度)

java - 使用 Hadoop 处理大量小文件

java - 如何在 JPA 中创建一个序列而不直接在数据库中创建它