我正在尝试开发一个 Java Spark 应用程序,该应用程序从 HDFS 读取 AVRO 记录 ( https://avro.apache.org/ ),该记录由一种名为 Gobblin 的技术放置在那里 ( https://github.com/linkedin/gobblin/wiki )。
示例 HDFS AVRO 数据文件:
/gobblin/work/job-output/KAFKA/kafka-gobblin-hdfs-test/20150910213846_append/part.task_kafka-gobblin-hdfs-test_1441921123461_0.avro
不幸的是,我发现用 Java 编写的示例有限。
- https://spark.apache.org/docs/1.3.1/quick-start.html
- https://spark.apache.org/docs/1.3.1/programming-guide.html
我发现的最好的东西是用 Scala 编写的(使用 Hadoop 版本 1 库)。
如有任何帮助,我们将不胜感激。
目前,我正在考虑使用以下代码,但我不确定如何从 AVRO 数据中提取值的 HashMap:
JavaPairRDD avroRDD = sc.newAPIHadoopFile(
path,
AvroKeyInputFormat.class,
AvroKey.class,
NullWritable.class,
new Configuration() );
// JavaPairRDD avroRDD = sc.newAPIHadoopFile(
// path,
// AvroKeyValueInputFormat.class,
// AvroKey.class,
// AvroValue.class,
// new Configuration() );
我当前的 Maven 依赖项:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.7.6</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.6</version>
<classifier>hadoop2</classifier>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.4.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
最佳答案
我编写了一个小型原型(prototype),它能够读取我的示例 Gobblin Avro 记录作为输入,并使用 Spark 输出相关结果 ( spark-hdfs-avro-test )。值得一提的是,我需要解决几个问题。 如有任何意见或反馈,我们将不胜感激。
问题 1:当前 Avro 版本 (1.7.7) 和 Java 序列化存在问题:
- https://issues.apache.org/jira/browse/AVRO-1502
- http://www.unknownerror.org/opensource/apache/spark/q/stackoverflow/20612571/spark-writing-to-avro-file
引用:
Spark relies on Java's Serializable interface to serialize objects. Avro objects don't implement Serializable. So, to work with Avro objects in Spark, you need to subclass your Avro generated classes and implement Serializable, e.g. https://github.com/massie/spark-parquet-example/blob/master/src/main/scala/com/zenfractal/SerializableAminoAcid.java.
为了解决这个问题,我编写了自己的可序列化包装类:
问题 2:我的 Avro 消息不包含“Key”值。
不幸的是,我无法使用任何现成的输入格式,不得不编写自己的输入格式:AvroValueInputFormat
public class AvroValueInputFormat<T> extends FileInputFormat<NullWritable, AvroValue<T>> {
我无法使用以下内容:
# org.apache.avro.mapreduce.AvroKeyInputFormat
public class AvroKeyInputFormat<T> extends FileInputFormat<AvroKey<T>, NullWritable> {
# org.apache.avro.mapreduce.AvroKeyValueInputFormat
public class AvroKeyValueInputFormat<K, V> extends FileInputFormat<AvroKey<K>, AvroValue<V>> {
问题 3:我无法使用 AvroJob 类 setter 来设置架构值,必须手动执行此操作。
hadoopConf.set( "avro.schema.input.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
hadoopConf.set( "avro.schema.input.value", Event.SCHEMA$.toString() ); //$NON-NLS-1$
hadoopConf.set( "avro.schema.output.key", Schema.create( org.apache.avro.Schema.Type.NULL ).toString() ); //$NON-NLS-1$
hadoopConf.set( "avro.schema.output.value", SeverityEventCount.SCHEMA$.toString() ); //$NON-NLS-1$
关于java - 如何在Spark 1.3.1中使用Java读取AVRO数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32766398/