hadoop - 如何使用 AvroParquetOutputFormat 设置多个 Avro 模式?

标签 hadoop avro parquet

在我的 MapReduce 作业中,我使用 AvroParquetOutputFormat 通过 Avro 模式写入 Parquet 文件。

应用程序逻辑需要通过 Reducer 创建多种类型的文件,并且每个文件都有自己的 Avro 架构。

AvroParquetOutputFormat 类有一个静态方法 setSchema() 来设置输出的 Avro 模式。查看代码,AvroParquetOutputFormat 使用 AvroWriteSupport.setSchema() ,这也是一个静态实现。

在不扩展 AvroWriteSupport 和破解逻辑的情况下,是否有一种更简单的方法可以在单个 MR 作业中从 AvroParquetOutputFormat 实现多个 Avro 模式输出?

非常感谢任何指示/输入。

感谢和问候

马可

最佳答案

现在回答可能已经很晚了,但我也遇到过这个问题并提出了解决方案。

首先,parquet-mr 中不支持内置“MultipleAvroParquetOutputFormat”。但是为了实现类似的行为,我使用了 MultipleOutputs

对于只有 map 的工作,像这样放置你的 map :

public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{

    protected  KafkaAvroDecoder deserializer;
    protected String outputPath = "";

    // Using MultipleOutputs to write custom named files
    protected MultipleOutputs<Void, GenericRecord> mos;

    public void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();           
        outputPath = conf.get(FileOutputFormat.OUTDIR);
        mos = new MultipleOutputs<Void, GenericRecord>(context);
    }

    public void map(LongWritable ln, BytesWritable value, Context context){

        try {
            GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
            AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
            Schema schema = record.getSchema();
            String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm 
            mos.write( (Void) null, record, mergeEventsPath);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

}

这将为每个模式创建一个新的 RecordWriter 并创建一个新的 parquet 文件,附加模式名称,例如 schema1-r-0000.parquet。

这还将根据驱动程序中设置的模式创建默认的 part-r-0000x.parquet 文件。为避免这种情况,请使用 LazyOutputFormat,例如:

LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);

希望这对您有所帮助。

关于hadoop - 如何使用 AvroParquetOutputFormat 设置多个 Avro 模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26046818/

相关文章:

hadoop - Kafka-Connect HDFS-Protobuf到 Parquet

hadoop - 通过oozie运行hive脚本脚本,但状态仅为RUNNING

hadoop - hadoop namenode命令是做什么用的

hadoop - 在Oozie中定义Avro key 的架构

hadoop - 将表的属性从 avro.schema.literal 设置为 avro.schema.url 后,Hive avro 表架构未更新

apache-flink - Flink 如何在 S3 中将 DataSet 写成 Parquet 文件?

java - 由 cron 调用时 Hadoop 作业失败

java - 将包含内容的目录从 HDFS 复制到本地文件系统

java - Apache Kafka 和 Avro : org. apache.avro.generic.GenericData$Record 无法转换为 com.harmeetsingh13.java.Customer

java - 转换 Spark 数据集中的数据时数据类型不匹配