scala - 如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误

标签 scala apache-spark apache-kafka spark-dataframe spark-streaming

我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。 我无法解决的唯一问题是在 ForeachWriter 中使用 kafkaProducer 时与 Task 序列化问题 有关。 在我为 Spark 1.6 开发的旧版本代码中,我使用了 foreachPartition 并且为每个分区定义了 kafkaProducer 以避免任务序列化问题。 我如何在 Spark 2.2 中做到这一点?

val df: Dataset[String] = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "test") 
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "true")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)] 
      .map(_._2)

var mySet = spark.sparkContext.broadcast(Map(
  "metadataBrokerList"->metadataBrokerList,
  "outputKafkaTopic"->outputKafkaTopic,
  "batchSize"->batchSize,
  "lingerMS"->lingerMS))

val kafkaProducer = Utils.createProducer(mySet.value("metadataBrokerList"),
                                mySet.value("batchSize"),
                                mySet.value("lingerMS"))

val writer = new ForeachWriter[String] {

    override def process(row: String): Unit = {
         // val result = ...
         val record = new ProducerRecord[String, String](mySet.value("outputKafkaTopic"), "1", result);
        kafkaProducer.send(record)
    }

    override def close(errorOrNull: Throwable): Unit = {}

    override def open(partitionId: Long, version: Long): Boolean = {
      true
    }
}

val query = df
        .writeStream
        .foreach(writer)
        .start

query.awaitTermination()

spark.stop()

最佳答案

编写 ForeachWriter 的实现并使用它。 (避免使用不可序列化对象的匿名类 - 在您的情况下是 ProducerRecord)
示例:val writer = new YourForeachWriter[String]
这里还有一篇关于 Spark 序列化问题的有用文章:https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error

关于scala - 如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47830947/

相关文章:

sql - scala中的动态where条件生成

Spring Kafka 和一次性交付保证

scala - -Xlint :unsound-match flag do in Scala? 是什么

apache-spark - Spark 失败- future 超时

string - Scala 中的条件隐式函数

amazon-web-services - 是否可以通过 EMR(通过 VPC)查看 Spark UI?

java - 使用 Google Dataflow 在批处理模式下使用 KafkaIO 进行消费

ssl - 加密密码kafka ssl设置

java - 如何测试方法如何处理并发调用

scala - 类型成员的隐式转换