java - 如何将嵌套的 avro GenericRecord 转换为 Row

标签 java apache-spark avro spark-avro

我有一段代码可以使用函数 avroToRowConverter()

将我的 avro 记录转换为 Row
directKafkaStream.foreachRDD(rdd -> {
        JavaRDD<Row> newRDD= rdd.map(x->{

            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
            return avroToRowConverter(recordInjection.invert(x._2).get());
            });

此函数不适用于嵌套架构 (TYPE= UNION)

private static Row avroToRowConverter(GenericRecord avroRecord) {
    if (null == avroRecord) {
        return null;
    }
    //GenericData
    Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
    StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
    for (Schema.Field field : avroRecord.getSchema().getFields()) {

        if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
            objectArray[field.pos()] = ""+avroRecord.get(field.pos());
        }else {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
    }

    return new GenericRowWithSchema(objectArray, structType);
}

谁能建议我如何将复杂架构转换为 ROW?

最佳答案

SchemaConverters.createConverterToSQL 但不幸的是它是私有(private)的。 有 PR 将其公开,但从未合并:

虽然我们使用了一种解决方法。

您可以通过在 com.databricks.spark.avro 包中创建一个类来公开它:

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

然后你可以像这样在你的代码中使用它:

final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();

final Function1<GenericRecord, Row> myAvroRecordConverter =
        MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);

Row[] convertAvroRecordsToRows(List<GenericRecord> records) {
    return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);
}

对于一条记录,您可以这样调用它:

final Row row = myAvroRecordConverter.apply(record);

关于java - 如何将嵌套的 avro GenericRecord 转换为 Row,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48828067/

相关文章:

scala - Spark : NullPointerException when RDD isn't collected before map

apache-spark - 如何同时运行 2 个 EMR Spark Step?

hadoop - 简单计数查询超出 Impala 内存限制

Java 1.8 ASM ClassReader 无法解析类文件 - 可能是由于尚不支持新的 Java 类文件版本

java - 使用 SmartPls 更改 Excel 文件中的工作表

java - 如何使用 Java 将 unix 纪元的列转换为 Apache spark DataFrame 中的日期?

c++ - 当字段可为空时,如何使用 C++ 接口(interface)在 Avro 中写入数据?

hadoop - 在不使用 HIVE 的情况下在 HDFS 中以 ORC 格式存储 avro 数据

java - 提供程序 "gs"未安装

java - 以编程方式更改android中的字体