java - AVRO Mapreduce-ClassCastException:org.apache.avro.generic.GenericData $ Record无法转换为Avro

标签 java hadoop mapreduce avro mapper

首先,我通过oozie将mapreduce作为Java Action 运行。使用AvroTools提取avro文件的架构,然后将其编译为Java类(RxAvro.java)。

每当我尝试使用Avro对象时,都会收到以下ClassCastException:
java.lang.ClassCastException:org.apache.avro.generic.GenericData $ Record无法转换为com.xxx.yyy.zz.jobs.actions.test.RxAvro

2018-01-05 07:46:07,038 WARN [main] org.apache.hadoop.mapred.YarnChild:

Exception running child : java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to com.xxx.yyy.zzz.jobs.actions.test.RxAvro
        at com.xxx.yyy.zzz.jobs.actions.test.AvroParqueMapreduce$Map.map(AvroParqueMapreduce.java:70)
        at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
        at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

MapReduce代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import parquet.avro.AvroParquetInputFormat;

import java.io.IOException;


public class AvroParqueMapreduce extends Configured implements Tool {


    /**
     * Main entry point for the example.
     *
     * @param args arguments
     * @throws Exception when something goes wrong
     */
    public static void main(final String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new AvroParqueMapreduce(), args);
        System.exit(res);
    }

    /**
     * The MapReduce driver - setup and launch the job.
     *
     * @param args the command-line arguments
     * @return the process exit code
     * @throws Exception if something goes wrong
     */
    public int run(final String[] args) throws Exception {


        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);

        Configuration conf = super.getConf();

        Job job = new Job(conf);
        job.setJarByClass(AvroParqueMapreduce.class);

        job.setInputFormatClass(AvroParquetInputFormat.class);
        AvroParquetInputFormat.setInputPaths(job, inputPath);

        job.setMapperClass(Map.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job, outputPath);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static class Map extends Mapper<Void, RxAvro, Text, Text> {

        @Override
        public void map(Void key,
                        RxAvro value,
                        Context context) throws IOException, InterruptedException {

            context.write(new Text(value.getObjectId().toString()),
                    new Text(value.getCollectionInterval().toString()));
        }
    }

任何帮助将不胜感激。

最佳答案

您的代码中有几个问题。您应该按照以下方式正确设置输入格式类,AvroPArquetInputFormat对于avro数据不正确。

    job.setInputFormatClass(AvroKeyInputFormat.class);

除此之外,您需要设置输入架构
    AvroJob.setInputKeySchema(job, RxAvro.getClassSchema());

本教程可以作为https://dzone.com/articles/mapreduce-avro-data-files的起点

关于java - AVRO Mapreduce-ClassCastException:org.apache.avro.generic.GenericData $ Record无法转换为Avro,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48110304/

相关文章:

java - 使用 Java Regex,如何检查字符串是否包含集合中的任何单词?

Hadoop Map Reduce 引用静态对象

hadoop - 使用MapReduce在指定程度内结识 friend

java - ArrayList size() 方法导致程序崩溃

java - 第一次使用 JAXB 将 XML 映射到 Java 类 — 需要一些构建帮助

hadoop - 过滤 hive 复杂数据类型

hadoop - 在 MapReduce 中读取 .tar.gz 文件时出现奇怪的输出

Hadoop分布式文件系统

hadoop - 非 mapreduce 应用程序如何在 YARN 中工作?

java - SCJP 6 和 OCJP 都一样吗?