scala - 使用 Kafka 在长时间运行的 Spark 作业之间进行通信

标签 scala apache-spark apache-kafka spark-streaming long-running-processes

我是 Apache Spark 新手,需要在 Spark 集群上同时运行多个长时间运行的进程(作业)。通常,这些单独的进程(每个进程都有自己的工作)需要相互通信。我暂时考虑使用 Kafka 作为这些流程之间的代理。因此,高层的工作间沟通如下所示:

  1. 作业 #1 执行一些工作并将消息发布到 Kafka 主题
  2. 作业 #2 设置为同一 Kafka 主题的流接收器(使用 StreamingContext),一旦消息发布到该主题,作业 #2 就会使用它<
  3. 作业 #2 现在可以根据它消耗的消息执行一些工作

据我所知,流上下文正在阻止在 Spark Driver 节点上运行的监听器。这意味着一旦我像这样启动流消费者:

def createKafkaStream(ssc: StreamingContext,
        kafkaTopics: String, brokers: String): DStream[(String, 
        String)] = {
    // some configs here
    KafkaUtils.createDirectStream[String, String, StringDecoder,
        StringDecoder](ssc, props, topicsSet)
}

def consumerHandler(): StreamingContext = {
    val ssc = new StreamingContext(sc, Seconds(10))

    createKafkaStream(ssc, "someTopic", "my-kafka-ip:9092").foreachRDD(rdd => {
        rdd.collect().foreach { msg =>
            // Now do some work as soon as we receive a messsage from the topic
        }
    })

    ssc
}

StreamingContext.getActive.foreach {
    _.stop(stopSparkContext = false)
}

val ssc = StreamingContext.getActiveOrCreate(consumerHandler)
ssc.start()
ssc.awaitTermination()

...现在有两个含义:

  1. 驱动程序现在正在阻塞并监听来自 Kafka 的工作以进行消费;和
  2. 收到工作(消息)后,它们会被发送到任何可用的工作节点以实际执行

所以首先,如果我上面所说的任何内容不正确或有误导性,请先纠正我!假设我或多或少是正确的,那么我只是想知道,根据我的标准,是否有一种更具可扩展性或性能的方法来实现这一目标。同样,我有两个长时间运行的作业(作业 #1 和作业 #2)在我的 Spark 节点上运行,其中一个需要能够将工作“发送到”另一个。有什么想法吗?

最佳答案

From what I can tell, streaming contexts are blocking listeners that run on the Spark Driver node.

A StreamingContext(单数)不是阻塞监听器。它的工作是为您的流作业创建执行图。

当您开始从 Kafka 读取数据时,您指定要每 10 秒获取一次新记录。从现在开始发生的情况取决于您为 Kafka 使用的 Kafka 抽象,是通过 KafkaUtils.createStream 的 Receiver 方法,还是通过 KafkaUtils.createDirectStream 的无接收器方法.

通常,在这两种方法中,数据都是从 Kafka 中消费,然后分派(dispatch)给每个 Spark 工作线程以并行处理。

then I'm simply wondering if there is a more scalable or performant way to accomplish this

这种方法具有高度可扩展性。当使用无接收器方法时,每个 Kafka 分区都会映射到给定 RDD 中的一个 Spark 分区。您可以通过增加 Kafka 中的分区数量或通过在 Spark 中重新分区数据(使用 DStream.repartition )来提高并行性。我建议测试此设置以确定它是否适合您的性能要求。

关于scala - 使用 Kafka 在长时间运行的 Spark 作业之间进行通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38957594/

相关文章:

scala - 副作用 mutable.Map scala

scala - 拆分 Play ! 2 应用成模块

java - 根据 DataStax Enterprise 的运行时类路径构建 Spark 应用程序

java - Apache Spark 行从时间戳到日期转换异常

java - 如何让KafkaProducer使用模拟的模式注册表进行测试?

scala - 如何选择由不同的SBT库依赖项添加的特定版本的程序包

json - 使用 Spark 解析 JSON 文件并提取键和值

java - 如何在 Java 中使用 Spark 的 .newAPIHadoopFile()

apache-kafka - Kafka 单个消费者在一个组中失败

maven - 客户端管理员的 Apache Kafka Maven 包名称