apache-spark - 如何修复 Spark Streaming Kafka Consumer 中的 "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord"?

标签 apache-spark serialization apache-kafka spark-streaming

  • Spark 2.0.0
  • Apache 卡夫卡 0.10.1.0
  • 斯卡拉 2.11.8


  • 当我使用 spark streaming and kafka integration with kafka broker version 0.10.1.0使用以下 Scala 代码,它会失败并出现以下异常:
    16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    

    为什么?如何解决?

    代码 :
    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark._
    import org.apache.commons.codec.StringDecoder
    import org.apache.spark.streaming._
    
    object KafkaConsumer_spark_test {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("KafkaConsumer_spark_test").setMaster("local[4]")
        val ssc = new StreamingContext(conf, Seconds(1))
        ssc.checkpoint("./checkpoint")
        val kafkaParams =Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "example",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("local1")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
        stream.map(record => (record.key, record.value))
        stream.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    异常(exception):
    16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    16/11/13 12:55:20 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying
    16/11/13 12:55:20 ERROR JobScheduler: Error running job streaming job 1479012920000 ms.0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    最佳答案

    KafkaUtils.createDirectStream 创建为 org.apache.spark.streaming.dstream.DStream。它不是 RDD。 Spark Streaming 将在运行时临时创建 RDD。要检索 RDD,请使用 stream.foreach() 获取 RDD,然后使用 RDD.foreach 获取 RDD 中的每个对象。这些将是您使用 value() 方法从 Kafka 主题读取消息的 Kafka ConsumerRecords:

    stream.foreachRDD { rdd => 
        rdd.foreach { record => 
        val value = record.value()
        println(map.get(value)) 
        }
    }
    

    关于apache-spark - 如何修复 Spark Streaming Kafka Consumer 中的 "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40570874/

    相关文章:

    c# - 如何反序列化此 JSON 字符串?

    java - 如何为此 Twitter API 与 Kafka-Producer 建立连接

    python - 结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4 : spark does not use jar

    python - 多次连续加入pyspark

    apache-spark - 如何在 Spark 2.4.4 中使用增量创建表?

    spring-boot - Kafka Streams与Spring Boot

    kubernetes - 推荐的部署kafka的方法是使其部署在所有可用节点中?

    apache-spark - 在 Spark 作业中使用多个执行者和 worker

    c# - 字典的 WCF 反序列化,其中枚举类型是关键

    java - Jackson:在序列化时抑制 JsonTypeinfo?