java - AvroMultipleInputs-添加多个路径时出现问题

标签 java hadoop mapreduce avro

这是我用来添加具有不同映射器类的多个Avro输入路径的驱动程序代码段

AvroMultipleInputs.addInputPath(jobConf, new Path(args[0]), IncrementalDataMapper.class, incrSchema);
AvroMultipleInputs.addInputPath(jobConf, new Path(args[1]), BaseDataMapper.class, incrSchema);

AvroJob.setMapOutputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));

AvroJob.setReducerClass(jobConf, DeltaCaptureReducer.class);
AvroJob.setInputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));
AvroJob.setOutputSchema(jobConf, incrSchema);

当我运行此驱动程序时,出现以下异常,该异常来自AvroMultipleInputs的getInputSchemaMap(...)方法

java.lang.RuntimeException: org.apache.avro.SchemaParseException: Can't redefine: com.sample.Test



现在,我要做的是在一个独立程序中模拟AvroMultipleInputs的getInputSchemaMap(...)方法来产生相同的问题。

独立代码

失败的代码,
    Schema.Parser schemaParser = new Schema.Parser();
    String m1 = "path1;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String m2 = "path2;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String[] schemaMappings = (m1 + "," + m2).split(",");
    for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
            throw new RuntimeException(e);
        }
    }

现在,我通过为每个映射创建解析器来解决此问题,如下所示。
for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            Schema.Parser schemaParser = new Schema.Parser();
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
                throw new RuntimeException(e);
        }
}

有人尝试过吗?有什么想法要解决吗?

我还尝试将AvroMultipleInputs复制到我的项目中,并如上所述将代码更改为使用其他解析器,但出现以下异常

Exception in thread "main" java.lang.NullPointerException at org.apache.hadoop.mapred.lib.MultipleInputs.getInputFormatMap(MultipleInputs.java:93) at org.apache.hadoop.mapred.lib.DelegatingInputFormat.getSplits(DelegatingInputFormat.java:55) at org.apache.hadoop.mapreduce.JobSubmitter.writeOldSplits(JobSubmitter.java:328) at org.apache.hadoop.mapreduce.JobSubmitter.writeSplits(JobSubmitter.java:320) at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:196) at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1290)

最佳答案

实际上,我必须自定义更多文件才能使其正常工作。我仍然不确定是否还有其他含义(我不知道我不知道)

AvroMultipleInputs.java
DelegatingInputFormat.java

DelegatingMapper.java
MapCollector.java
TaggedInputSplit.java

关于java - AvroMultipleInputs-添加多个路径时出现问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37060800/

相关文章:

java - 如何在java中使用以下curl post命令?

java - 在从hbase读取的重复数据上运行Python Mapreduce

javascript - MongoDB 的映射缩减

java - 大容量可查询和可遍历的数据结构

java - @Transactional(propagation=Propagation.REQUIRES_NEW) 的奇怪行为

java - 通过Scala/Java的Hadoop setfacl

hadoop - Hadoop:ClassNotFoundException

hadoop - reducer 阶段根本没有启动。它总是 'reducer=0%'

javascript - 如何在javascript(es6)中展平对象属性数组有数组有数组?

java - Weka 异常 : No source has been specified