在 Spark 2.2 中,我将 Spark Streaming 与 Kafka 结合使用,如下所示:
val conf = new SparkConf()
.setAppName("Test")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(60))
val kafkaParams = Map[String, String](
"metadata.broker.list" -> "host1:port1,host2:port2",
"group.id" -> "group",
"auto.offset.reset" -> "largest")
val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)
dstream.foreachRDD(rdd => { ...
})
ssc.start()
ssc.awaitTermination()
现在我需要使用 Spark 2.2 执行相同的操作。我正在阅读有关结构化流的内容。我是否正确理解我应该创建一个流式 DataFrame,而不需要使用 ssc.start()
和 ssc.awaitTermination()
?
这是否是上面显示的代码的完全正确的替换?
val spark = SparkSession
.builder()
.appName("Test")
.enableHiveSupport()
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
在哪里定义"group.id"-> group
和"auto.offset.reset"-> "largest"
?
更新:
我还发现了一些不同的方法:
val rawData: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", metadataBrokerList)
.option("subscribe", inputKafkaTopic)
.option("startingOffsets", "latest")
.option("failOnDataLoss", "true")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
我可以使用 rawData
作为 rawData.foreachRDD(rdd => { ... })
最佳答案
不会的。您仍然需要一个查询(接收器)。最简单的形式
val keyValueDf = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val query = keyValueDf
.writeStream
.format("console")
.start()
和awaiTermination
(或其他类似的地方,如spark.streams.awaitAnyTermination
)
query.awaitTermination()
参见Starting Streaming Queries和 Managing Streaming Queries .
要在接收器中执行任意代码(例如foreachRDD
),您可以尝试writeStream.foreach
并定义ForeachWriter
:
val writer: ForeachWriter[Row]
df.writeStream.foreach(writer).start()
有关选项,请参阅 Kafka Specific Configurations .
group.id
对于查询是唯一的:group.id: Kafka source will create a unique group id for each query automatically.
startingOffsets
应该用来代替auto.offset.reset
:auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
关于scala - Spark 流 (Spark 1.6) 与结构化流 (Spark 2.2),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47797190/