apache-spark - "java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext"执行 Spark 流时

标签 apache-spark spark-streaming

当我在 yarn 上执行 Spark 流应用程序时,我继续收到以下错误

为什么会发生错误以及如何解决?任何建议都会有所帮助,谢谢~

15/05/07 11:11:50 INFO dstream.StateDStream: Marking RDD 2364 for time 1430968310000 ms for checkpointing
    15/05/07 11:11:50 INFO scheduler.JobScheduler: Added jobs for time 1430968310000 ms
    15/05/07 11:11:50 INFO scheduler.JobGenerator: Checkpointing graph for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updating checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 INFO streaming.DStreamGraph: Updated checkpoint data for time 1430968310000 ms
    15/05/07 11:11:50 ERROR actor.OneForOneStrategy: org.apache.spark.streaming.StreamingContext
    java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
            at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
            at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
            at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
            at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)

spark流应用程序代码如下,我在spark-shell中执行
    import kafka.cluster.Cluster
import kafka.serializer.StringDecoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext._

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
  Some(0)
}

val ssc = new StreamingContext(sc,
  new Duration(5000))
ssc.checkpoint(".")

val lines = KafkaUtils.createStream(ssc, "10.1.10.21:2181", "kafka_spark_streaming", Map("hello_test" -> 3))

val uuidDstream = lines.transform(rdd => rdd.map(_._2)).map(x => (x, 1)).updateStateByKey[Int](updateFunc)
uuidDstream.count().print()

ssc.start()
ssc.awaitTermination()

最佳答案

引用 val updateFuncupdateStateByKey 的闭包内使用正在将该实例的其余部分拉入闭包中并使用 StreamingContext。

两种选择:

  • 快速修复:声明流上下文 transient => @transient val ssc= ...将 dstream 声明注释为 @transient 也是一个好主意。以及。
  • 更好的解决方法:将您的函数放在一个单独的对象中

  • 像这样:
    case object TransformFunctions {
        val updateFunc = ???
    }
    

    关于apache-spark - "java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext"执行 Spark 流时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30091371/

    相关文章:

    scala - 具有动态数据类型的 UDF

    hadoop - spark + hadoop 数据本地化

    python - 使用 PySpark 将复杂 RDD 转换为扁平化 RDD

    scala - EsHadoopIllegalArgumentException:无法检测ES版本Spark-ElasticSearch示例

    scala - 检测 Spark Streaming 中丢失的连接

    python - 如何在流中应用 MLFlow 预测模型?

    apache-spark - Spark Streaming 作业因 ArrayBuffer(kafka.common.NotLeaderForPartitionException) 而失败

    java - Apache Spark 驱动内存

    apache-spark - 如何读取 Spark 工作节点中的文件?

    python - 如何根据条件将字符串数组转换为结构体数组