hadoop - 预期 org.apache.hadoop.hive.ql.io.orc.OrcStruct,收到 org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow

标签 hadoop mapreduce hive orc

当我读取 orcfile 并将数据写入 orcfile 时,出现以下错误:

expected org.apache.hadoop.hive.ql.io.orc.OrcStruct, 
received org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow

是不是MapoutputValue.class不对?

this is my program:
package com.baifendian.basicPlatform.hive.ql.io.orc;

import java.io.IOException;
import java.util.List;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.IOConstants;
import org.apache.hadoop.hive.ql.io.orc.OrcNewInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcNewOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class OrcInput {
    final static String inputSchema = "struct<c1:string,c2:string>";

    final static String outputSchema = "struct<c1:string,c2:string>";

    static StructObjectInspector inputOI;
    static SettableStructObjectInspector outputOI;
    static OrcSerde orcsd;

    public static class OrcReaderMap extends
            Mapper<NullWritable, OrcStruct, NullWritable, Writable> {

        public void setup(Context context) throws IOException,
                InterruptedException {
            super.setup(context);
            TypeInfo tfin = TypeInfoUtils
                    .getTypeInfoFromTypeString(inputSchema);
            TypeInfo tfout = TypeInfoUtils
                    .getTypeInfoFromTypeString(outputSchema);

            inputOI = (StructObjectInspector) OrcStruct
                    .createObjectInspector(tfin);
            outputOI = (SettableStructObjectInspector) OrcStruct
                    .createObjectInspector(tfout);

            orcsd = new OrcSerde();
            List<? extends StructField> fldlst = outputOI
                    .getAllStructFieldRefs();
            StringBuffer sbCols = new StringBuffer();
            StringBuffer sbTyps = new StringBuffer();
            for (StructField sf : fldlst) {
                if (sbCols.length() > 0) {
                    sbCols.append(",");
                }
                sbCols.append(sf.getFieldName());
                if (sbTyps.length() > 0) {
                    sbTyps.append(",");
                }
                sbTyps.append(sf.getFieldObjectInspector().getTypeName());
            }
            Properties props = new Properties();
            props.put(IOConstants.COLUMNS, sbCols.toString());
            props.put(IOConstants.COLUMNS_TYPES, sbTyps.toString());
            orcsd.initialize(context.getConfiguration(), props);
        }

        public void map(NullWritable meaningless, OrcStruct orc, Context context)
                throws IOException, InterruptedException {
            List<Object> ilst = inputOI.getStructFieldsDataAsList(orc);
            Text f1 = (Text) ilst.get(0);
            Text f2 = (Text) ilst.get(1);
            // output orc format
            OrcStruct objOut = (OrcStruct) outputOI.create();
            List<? extends StructField> flst = outputOI.getAllStructFieldRefs();
            outputOI.setStructFieldData(objOut, flst.get(0), f1);
            outputOI.setStructFieldData(objOut, flst.get(1), f2);
            context.write(NullWritable.get(), orcsd.serialize(objOut, outputOI));
        }
    }

    public static void main(String[] args) throws IOException,
            InterruptedException, ClassNotFoundException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "OrcReader");
        job.setJarByClass(OrcInput.class);
        job.setInputFormatClass(OrcNewInputFormat.class);
        job.setOutputFormatClass(OrcNewOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcinput"));
        FileOutputFormat.setOutputPath(job, new Path("/warehouse/bae_xinhua_test.db/orcoutput"));
        job.setMapOutputKeyClass(NullWritable.class); 
        job.setMapOutputValueClass(OrcStruct.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(OrcStruct.class);
        job.setMapperClass(OrcInput.OrcReaderMap.class);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

最佳答案

请添加 job.setNumReduceTasks(0);

关于hadoop - 预期 org.apache.hadoop.hive.ql.io.orc.OrcStruct,收到 org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33473877/

相关文章:

php hiveserver 2 问题

hadoop - 如何使用远程hadoop集群

sql - 在 Impala 中将数组列查询为行的解决方法

java - 在mapreduce作业中设置自定义文件格式类

linq - RavenDB:从多个博客索引博客文章标签

hadoop - 如何确定Hive中的动态分区数

java - 如何在 Debug模式下运行配置单元

hadoop - 安装 zookeeper-server 时出现损坏的包错误

hadoop - 如何使用SequenceFileInputFormat将字节数组序列化为本地文件

scala - 如何使用其架构从Spark数据框架创建配置单元表?