java - 如何在Spark 1.3.1中使用Java读取AVRO数据?

标签 java apache-spark hdfs avro gobblin

我正在尝试开发一个 Java Spark 应用程序,该应用程序从 HDFS 读取 AVRO 记录 ( https://avro.apache.org/ ),该记录由一种名为 Gobblin 的技术放置在那里 ( https://github.com/linkedin/gobblin/wiki )。

示例 HDFS AVRO 数据文件:

/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro

不幸的是,我发现用 Java 编写的示例有限。

我发现的最好的东西是用 Scala 编写的(使用 Hadoop 版本 1 库)。

如有任何帮助,我们将不胜感激。

目前,我正在考虑使用以下代码,但我不确定如何从 AVRO 数据中提取值的 HashMap:

JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
    path, 
    AvroKeyInputFormat.class, 
    AvroKey.class, 
    NullWritable.class, 
    new Configuration() );

// JavaPairRDD avroRDD = sc.newAPIHadoopFile( 
//    path, 
//    AvroKeyValueInputFormat.class, 
//    AvroKey.class, 
//    AvroValue.class, 
//    new Configuration() );

我当前的 Maven 依赖项:

<dependencies>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.3.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro</artifactId>
        <version>1.7.6</version>
    </dependency>
    <dependency>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-mapred</artifactId>
        <version>1.7.6</version>
        <classifier>hadoop2</classifier>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-annotations</artifactId>
      <version>2.4.3</version>
    </dependency>


    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <scope>provided</scope>
    </dependency>

    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <scope>test</scope>
    </dependency>

</dependencies>

最佳答案

我编写了一个小型原型(prototype),它能够读取我的示例 Gobblin Avro 记录作为输入,并使用 Spark 输出相关结果 ( spark-hdfs-avro-test )。值得一提的是,我需要解决几个问题。 如有任何意见或反馈,我们将不胜感激。

问题 1:当前 Avro 版本 (1.7.7) 和 Java 序列化存在问题:

引用:

Spark relies on Java's Serializable interface to serialize objects. Avro objects don't implement Serializable. So, to work with Avro objects in Spark, you need to subclass your Avro generated classes and implement Serializable, e.g. https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java.

为了解决这个问题,我编写了自己的可序列化包装类:

问题 2:我的 Avro 消息不包含“Key”值。

不幸的是,我无法使用任何现成的输入格式,不得不编写自己的输入格式:AvroValueInputFormat

public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> {

我无法使用以下内容:

# org.apache.avro.mapreduce.AvroKeyInputFormat
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {

# org.apache.avro.mapreduce.AvroKeyValueInputFormat
public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {

问题 3:我无法使用 AvroJob 类 setter 来设置架构值,必须手动执行此操作。

    hadoopConf.set( "avro.schema.input.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.input.value", Event.SCHEMA$.toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
    hadoopConf.set( "avro.schema.output.value", SeverityEventCount.SCHEMA$.toString() ); //$NON-NLS-1$

关于java - 如何在Spark 1.3.1中使用Java读取AVRO数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32766398/

相关文章:

java - Eclipse 与 symbian SDK 的集成

java - 如何从适配器中删除处理程序

scala - 使用 SQLContext 隐式在 Spark 中进行单元测试

scala - 使用 apache Ignite 共享 sparkRDD

hadoop - 将目录从远程 HDFS 本​​地文件系统复制到我的本地机器

hadoop - 使用 JAVA API 授予 hdfs 目标目录的权限

java - 赋值的左边不是变量?

java - Spring,数字验证,默认值,警告消息

python - 在条件列表上使用逻辑AND的PySpark DataFrame过滤器-Numpy All Equivalent

hadoop - HDFS中数据可用性的事件通知?