apache-spark - CheckPointing 时在 foreachRDD() 中使用的对象的序列化

标签 apache-spark spark-streaming rdd avro kryo

根据 this question和我读过的文档,Spark Streaming 的 foreachRDD(someFunction) 将让 someFunction 本身仅在驱动程序进程中执行,但如果在 RDD 上完成了操作,那么它们将是在执行器上完成 - RDD 所在的位置。

以上所有内容也适用于我,尽管我注意到如果我打开检查点,那么似乎 spark 正在尝试序列化 foreachRDD(someFunction) 中的所有内容并发送到某个地方 - 这是对我造成问题,因为使用的对象之一不可序列化(即 schemaRegistryClient)。我尝试了 Kryo 序列化程序,但也没有成功。

如果我关闭检查点,序列化问题就会消失。

有没有办法让 Spark 不序列化 foreachRDD(someFunc) 中使用的内容,同时继续使用检查点?

非常感谢。

最佳答案

Is there a way to let Spark not to serialize what's used in foreachRDD(someFunc) while also keep using checkpointing?

检查点应该与您的问题无关。根本问题是您有一个不可序列化的对象实例需要发送给您的工作人员。

当你有这样的依赖时,有一个通用的模式可以在 Spark 中使用。您创建一个具有惰性 transient 属性的对象,该属性将在需要时加载到工作节点中:

object RegisteryWrapper {
  @transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
}

当你需要在 foreachRDD 中使用它时:

someStream.foreachRDD { 
   rdd => rdd.foreachPartition { iterator => 
       val schemaClient = RegisteryWrapper.schemaClient
       iterator.foreach(schemaClient.send(_))
  }
}

关于apache-spark - CheckPointing 时在 foreachRDD() 中使用的对象的序列化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39645485/

相关文章:

apache-spark - 如果一个分区丢失,我们可以使用lineage来重建它。是否会再次加载基础 RDD?

scala - Spark & Scala - 无法从 RDD 中过滤空值

apache-spark - Spark 支持的各种文件/数据格式

apache-spark - 如何使用 spark 数据框评估 spark Dstream 对象

scala - 使用Scala在Apache Spark中连接不同RDD的数据集

python - Pyspark 显示最大值(S)和多重排序

scala - Spark DataFrame 过滤 : retain element belonging to a list

scala - 尝试保存 Spark SQL Dataframes 总是导致空目录

apache-spark - 即使将 "auto.offset.reset"设置为 "latest"后也会出现错误 OffsetOutOfRangeException

java - KafkaConsumer 在轮询时进入无限期等待状态