scala - Spark : java. io.NotSerializableException : org. apache.avro.Schema$RecordSchema

标签 scala apache-spark avro

我正在创建 avro RDD使用以下代码。

 def convert2Avro(data : String ,schema : Schema)  : AvroKey[GenericRecord] = {
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

和创造 avro RDD如下。
 var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schema)))

在执行时,我在上面的行中遇到以下异常
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
        at org.apache.spark.rdd.RDD.map(RDD.scala:270)
        at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:331)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)

任何指针?

最佳答案

Schema.ReocrdSchema类尚未实现 serializable .所以它不能通过网络传输。我们可以将模式转换为字符串并传递给方法,并在方法内部重构模式对象。

var schemaString = schema.toString
var avroRDD = fieldsRDD.map(x =>(convert2Avro(x, schemaString)))

在方法内部重构架构:
def convert2Avro(data : String ,schemaString : String)  : AvroKey[GenericRecord] = {
   var schema = parser.parse(schemaString)
   var wrapper = new AvroKey[GenericRecord]()
   var record = new GenericData.Record(schema)
   record.put("empname","John")
    wrapper.datum(record)
    return wrapper 
  }

关于scala - Spark : java. io.NotSerializableException : org. apache.avro.Schema$RecordSchema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28412932/

相关文章:

scala - 可选字段,ADT,还是?

scala - Scala 有 0 元组和 1 元组的语法吗?

scala - 使用 Spark 中的数据框以微秒精度解析日期

python - PySpark 窗口不适用于指定的整数范围

java - Scala:如何进行字符串连接以避免 GC 开销问题

r - 在 SparkR 中应用 withColumn 函数和正则表达式模式 : reformat a string column in a DataFrame

java - 在Kafka Streams中使用KStream将字符串更改为avro时出现空指针异常

java - 如何使 Avro 模式中的所有字段都可以为空?

java - 使用 Mapreduce 在 hadoop 中将文本文件转换为 Avrofile

function - Scala val 语法 : What does val myVal:{ def . .. } 是什么意思?