我已经创建了一个 JSON 数据,并为其创建了一个 Avro 架构:
{"username":"miguno","tweet":"Rock: Nerf paper, scissors is fine.","timestamp": 1366150681 }
{"username":"BlizzardCS","tweet":"Works as intended. Terran is IMBA.","timestamp": 1366154481 }
和
block 引用>{ "type" : "record", "name" : "twitter_schema", "namespace" : "com.miguno.avro", "fields" : [ { "name" : "username", "type" : "string", "doc" : "Name of the user account on Twitter.com" }, { "name" : "tweet", "type" : "string", "doc" : "The content of the user's Twitter message" }, { "name" : "timestamp", "type" : "long", "doc" : "Unix epoch time in seconds" } ], "doc:" : "A basic schema for storing Twitter messages" }
然后我将其转换为 Avro,如下所示:
java -jar ~/avro-tools-1.7.4.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
将文件放在 hdfs 上:
hadoop fs -copyFromLocal twitter.avro <path>
然后在 Spark CLI 中我发出了代码:
import org.apache.avro.generic.GenericRecord import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} import org.apache.hadoop.io.NullWritable val path = "hdfs:///path/to/your/avro/folder" val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path)
但是在做时:
avroRDD.first
我遇到以下异常:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0 in stage 7.0 (TID 13) had a not serializable result: org.apache.avro.mapred.AvroWrapper at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
有什么解决办法吗?
最佳答案
Spark 正在尝试序列化/序列化您的 avro 数据,但它不是“java 可序列化”(spark 中使用的默认序列化)。
您有几个选择:
- 从包装器中提取通用记录并将每个记录映射到某个可序列化的结构
- 生成特定的记录类并解析它们,而不是通用记录(您仍然需要从包装器中提取记录)
- 启用 kryo 序列化(这仅在某些情况下有效)
请注意,记录在内部重用,因此,如果您执行 rdd.collect,您最终将得到具有相同值的所有记录。在进行收集之前将原始输入数据映射到其他数据可以解决复制时的问题。
关于scala - 如何从 HDFS 检索 Avro 数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27561281/