我有以下使用结构化流 (Spark 2.2) 的工作代码,以便从 Kafka (0.10) 读取数据。
我无法解决的唯一问题是在 ForeachWriter
中使用 kafkaProducer
时与 Task 序列化问题
有关。
在我为 Spark 1.6 开发的旧版本代码中,我使用了 foreachPartition
并且为每个分区定义了 kafkaProducer
以避免任务序列化问题。
我如何在 Spark 2.2 中做到这一点?
val df: Dataset[String] = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
.map(_._2)
var mySet = spark.sparkContext.broadcast(Map(
"metadataBrokerList"->metadataBrokerList,
"outputKafkaTopic"->outputKafkaTopic,
"batchSize"->batchSize,
"lingerMS"->lingerMS))
val kafkaProducer = Utils.createProducer(mySet.value("metadataBrokerList"),
mySet.value("batchSize"),
mySet.value("lingerMS"))
val writer = new ForeachWriter[String] {
override def process(row: String): Unit = {
// val result = ...
val record = new ProducerRecord[String, String](mySet.value("outputKafkaTopic"), "1", result);
kafkaProducer.send(record)
}
override def close(errorOrNull: Throwable): Unit = {}
override def open(partitionId: Long, version: Long): Boolean = {
true
}
}
val query = df
.writeStream
.foreach(writer)
.start
query.awaitTermination()
spark.stop()
最佳答案
编写 ForeachWriter 的实现并使用它。 (避免使用不可序列化对象的匿名类 - 在您的情况下是 ProducerRecord)
示例:val writer = new YourForeachWriter[String]
这里还有一篇关于 Spark 序列化问题的有用文章:https://www.cakesolutions.net/teamblogs/demystifying-spark-serialisation-error
关于scala - 如何在 Spark 2.2 中使用 foreachPartition 来避免任务序列化错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47830947/