scala - 如何使用 Avro 文件上的架构在 Spark 中加载 Avros?

标签 scala hadoop avro apache-spark

我正在从 Cloudera parcel 运行 CDH 4.4 和 Spark 0.9.0。

我有一堆通过 Pig 的 AvroStorage UDF 创建的 Avro 文件。我想使用通用记录或 Avro 文件上的模式将这些文件加载​​到 Spark 中。到目前为止,我已经试过了:

import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import java.net.URI
import java.io.BufferedInputStream
import java.io.File
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.mapred.FsInput

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"
val inURI = new URI(input)
val inPath = new Path(inURI)

val fsInput = new FsInput(inPath, sc.hadoopConfiguration)
val reader =  new GenericDatumReader[GenericRecord]
val dataFileReader = DataFileReader.openReader(fsInput, reader)
val schemaString = dataFileReader.getSchema

val buf = scala.collection.mutable.ListBuffer.empty[GenericRecord]
while(dataFileReader.hasNext)  {
  buf += dataFileReader.next
}
sc.parallelize(buf)

这适用于一个文件,但无法扩展 - 我将所有数据加载到本地 RAM,然后从那里将其分布到 spark 节点。

最佳答案

回答我自己的问题:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.AvroKey
import org.apache.avro.mapred.AvroInputFormat
import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.hadoop.io.NullWritable
import org.apache.commons.lang.StringEscapeUtils.escapeCsv

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import java.io.BufferedInputStream
import org.apache.avro.file.DataFileStream
import org.apache.avro.io.DatumReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.file.DataFileReader
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.mapred.FsInput
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.hadoop.mapred.JobConf
import java.io.File
import java.net.URI

// spark-shell -usejavacp -classpath "*.jar"

val input = "hdfs://hivecluster2/securityx/web_proxy_mef/2014/05/29/22/part-m-00016.avro"

val jobConf= new JobConf(sc.hadoopConfiguration)
val rdd = sc.hadoopFile(
  input,
  classOf[org.apache.avro.mapred.AvroInputFormat[GenericRecord]],
  classOf[org.apache.avro.mapred.AvroWrapper[GenericRecord]],
  classOf[org.apache.hadoop.io.NullWritable],
  10)
val f1 = rdd.first
val a = f1._1.datum
a.get("rawLog") // Access avro fields

关于scala - 如何使用 Avro 文件上的架构在 Spark 中加载 Avros?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23944615/

相关文章:

scala - sbt 获取 RootProject 的构建文件夹

scala - Scala 是否具有用于制作不可变数据结构的修改克隆的记录更新语法?

hadoop - 通过Web界面运行Hadoop作业

go - 使用 goavro 创建的 Avro 文件将数字数据加载到 BigQuery

java - 具有两种参数类型的 Scala 向左折叠

function - Scala REPL : How to find function type?

python - Pyspark 列在查找前几行和计算时生成

azure - 使用脚本操作在HDInsight上安装Giraph

java - Avro - 反序列化 POJO

java - Avro GenericRecords、BigQuery 和 Beam