scala - Spark 流 (Spark 1.6) 与结构化流 (Spark 2.2)

标签 scala apache-spark spark-streaming

在 Spark 2.2 中,我将 Spark Streaming 与 Kafka 结合使用,如下所示:

val conf = new SparkConf()
              .setAppName("Test")
              .setMaster("local[*]")
val sc = new SparkContext(conf)

val ssc = new StreamingContext(sc, Seconds(60))

val kafkaParams = Map[String, String](
      "metadata.broker.list" -> "host1:port1,host2:port2",
      "group.id" -> "group",
      "auto.offset.reset" -> "largest")
    val dstream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet).map(_._2)

dstream.foreachRDD(rdd => { ...
})

ssc.start()

ssc.awaitTermination()

现在我需要使用 Spark 2.2 执行相同的操作。我正在阅读有关结构化流的内容。我是否正确理解我应该创建一个流式 DataFrame,而不需要使用 ssc.start()ssc.awaitTermination()

这是否是上面显示的代码的完全正确的替换?

val spark = SparkSession
          .builder()
          .appName("Test")
          .enableHiveSupport()
          .getOrCreate()

val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
      .option("subscribe", "topic1")
      .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

在哪里定义"group.id"-> group"auto.offset.reset"-> "largest"

更新:

我还发现了一些不同的方法:

val rawData: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", metadataBrokerList)
      .option("subscribe", inputKafkaTopic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "true")
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]

我可以使用 rawData 作为 rawData.foreachRDD(rdd => { ... })

最佳答案

不会的。您仍然需要一个查询(接收器)。最简单的形式

val keyValueDf = df
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

val query = keyValueDf
  .writeStream
  .format("console")
  .start()

awaiTermination(或其他类似的地方,如spark.streams.awaitAnyTermination)

query.awaitTermination()

参见Starting Streaming QueriesManaging Streaming Queries .

要在接收器中执行任意代码(例如foreachRDD),您可以尝试writeStream.foreach并定义ForeachWriter :

val writer: ForeachWriter[Row]
df.writeStream.foreach(writer).start()

有关选项,请参阅 Kafka Specific Configurations .

  • group.id对于查询是唯一的:

    group.id: Kafka source will create a unique group id for each query automatically.

  • startingOffsets 应该用来代替 auto.offset.reset:

    auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.

关于scala - Spark 流 (Spark 1.6) 与结构化流 (Spark 2.2),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47797190/

相关文章:

scala - 做 sbt ! : output to file

scala - Finagle + 节俭 : Count method invocations

eclipse - sc.TextFile ("") 在 Eclipse 中工作但不在 JAR 中

python - 如何增加bluemix上spark-submit作业的日志输出?

java - 使用 Spark 从 Azure Blob 读取数据

apache-spark - 如何在 Spark Streaming for Lookups 中创建到数据源的连接

scala - PowerMock 无法解决不明确的引用

apache-spark - Apache Spark : Differences between client and cluster deploy modes

scala - Spark Streaming窗口输出

Scala 类型参数括号