apache-spark - Spark Streaming 连续作业

标签 apache-spark rabbitmq spark-streaming

我有一个 Spark Streaming 作业,它实现了自定义接收器。该接收器从队列中获取记录,直到队列耗尽(或满足间隔),然后将这些记录返回到主机上下文以写入数据库。

写入这些记录后,我希望主机进程启动新的接收器并继续处理,如何使用 API 执行此操作?

主机进程如下所示:

def main(args: Array[String]) {
  val config = new SparkConf()
  config.setAppName("Vehicle Data Queue Consumer")
  config.set("spark.driver.allowMultipleContexts", "true")

  val streamContext = new StreamingContext(config, Seconds(1) )

  val rStream = generateReceiverStream(streamContext)

  val sparkContext = new SparkContext(config)

  streamContext.start()

  streamContext.awaitTermination()
}

def generateReceiverStream(aContext: StreamingContext): ReceiverInputDStream[List[String]] = {

  val rmqReceiver = new RMQReceiver("amqp://myQueue")
  val customReceiverStream = aContext.receiverStream(rmqReceiver)

  val handler = (rdd: RDD[List[String]]) => this.handleStreamResult(rdd)

  customReceiverStream.foreachRDD(handler)


  return customReceiverStream
}

def handleStreamResult(rdd: RDD[List[String]]): Unit ={
  rdd.foreach { record =>
    record.foreach { aString =>
      println("****************************")
      println(s"$aString")
      println("****************************")
    }
  }
}

最佳答案

接收器在流上下文开始时实例化一次,并且预计在 Spark Streaming 作业处于事件状态时处于“事件状态”。 Spark Streaming 将在接收器上使用 onStart()onStop 方法来管理其生命周期。

在 Spark Streaming 中,接收器 应在事件时同时生成数据。也就是说,在调用 receiver.start() 之后,自定义接收器应该创建并管理自己的线程,其中对 store(...) 的调用将为逻辑生成数据。从该接收器创建的 DStream。

Spark Streaming 将管理接收器生命周期以处理故障场景和 streamingContext.stop 调用。因此,在启动流上下文后,没有必要甚至不可能以编程方式“启动新接收器”。

设计并实现您的自定义接收器以遵循此预期行为,并且作业将连续运行,无需任何进一步的努力。

关于apache-spark - Spark Streaming 连续作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29684226/

相关文章:

apache-spark - 何时使用 mapParitions 和 mapPartitionsWithIndex?

jms - openwire vs amqp,哪个性能更好

apache-spark - 如何检查结构化流中的StreamingQuery性能指标?

java springrabbitmq让一个队列等待另一个队列

spark-streaming - 结构化流 Python API

java - 为什么 KafkaUtils.createStream() 的 "topics"参数是一个 Map 而不是数组?

apache-spark - 为什么Spark不使用本地计算机上的所有内核

scala - 通过 Scala 读取 Spark 3 中的 netcdf 文件

apache-spark - 线程 "main"java.lang.NoClassDefFoundError : com/typesafe/config/ConfigFactory 中的异常

node.js - AMQPLIB - NodeJS - 在 RabbitMQ 中断言惰性队列