scala - 如何从 HDFS 检索 Avro 数据?

标签 scala apache-spark avro

我已经创建了一个 JSON 数据,并为其创建了一个 Avro 架构:

{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended.  Terran is IMBA.","timestamp": 1366154481 }

{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }

然后我将其转换为 Avro,如下所示:

java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro

将文件放在 hdfs 上:

hadoop fs -copyFromLocal twitter.avro <path>

然后在 Spark CLI 中我发出了代码:

import org.apache.avro.generic.GenericRecord
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper}
import org.apache.hadoop.io.NullWritable

val path = "hdfs:///path/to/your/avro/folder"
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)

但是在做时:

avroRDD.first

我遇到以下异常:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 7.0 (TID 13) had a not serializable result: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)

有什么解决办法吗?

最佳答案

Spark 正在尝试序列化/序列化您的 avro 数据,但它不是“java 可序列化”(spark 中使用的默认序列化)。

您有几个选择:

  • 从包装器中提取通用记录并将每个记录映射到某个可序列化的结构
  • 生成特定的记录类并解析它们,而不是通用记录(您仍然需要从包装器中提取记录)
  • 启用 kryo 序列化(这仅在某些情况下有效)

请注意,记录在内部重用,因此,如果您执行 rdd.collect,您最终将得到具有相同值的所有记录。在进行收集之前将原始输入数据映射到其他数据可以解决复制时的问题。

关于scala - 如何从 HDFS 检索 Avro 数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27561281/

相关文章:

scala - Apache Spark Scala - Hive 插入到抛出 "too large frame error"

scala - 退出状态:-100。诊断:容器在“丢失”节点上释放

scala - 管道中 Spark 数据帧中的 OneHotEncoder

cassandra - DataStax 企业 : Spark Cassandra Batch Size

ios - 用C语言将数组数据设置为Avro数组类型

maven - 尝试将数据流式传输到 Kafka 时出现 "Error registering Avro schema"

scala - 如果我在单行数据 block 笔记本中执行命令会花费更少的时间吗?

scala - 如何在scala中对列表列表执行转置?

scala - 如何在 EMR 上使用 Spark 3 解析 Scala 对象的 "Failed to load class"

python - 如何在 Python 中从 S3 读取 Avro 文件?