scala - 在 Spark Streaming 中重用 kafka producer

标签 scala apache-spark apache-kafka spark-streaming kafka-producer-api

我们有一个 spark 流应用程序(以下是代码),它从 kafka 获取数据并在将数据插入 MongoDB 之前(对每条消息)进行一些转换。我们有一个中间件应用程序将消息(批量)推送到 Kafka 并等待来自 Spark 流应用程序的确认(对于每条消息)。如果在将消息发送到 Kafka 后一定时间内(5 秒)中间件没有收到确认,中间件应用程序将重新发送消息。 Spark Streaming 应用程序能够接收大约 50-100 条消息(一批)并在 5 秒内发送所有消息的确认。但是,如果中间件应用程序推送超过 100 条消息,则由于 spark streaming 发送确认的延迟,会导致中间件应用程序重新发送消息。在我们当前的实现中,每次我们要发送确认时都会创建生产者,这需要 3-4 秒。

package com.testing

import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{ Seconds, StreamingContext }
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.streaming.kafka._
import org.apache.spark.sql.{ SQLContext, Row, Column, DataFrame }
import java.util.HashMap
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

import org.joda.time._
import org.joda.time.format._

import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import com.mongodb.util.JSON

import scala.io.Source._
import java.util.Properties
import java.util.Calendar

import scala.collection.immutable
import org.json4s.DefaultFormats


object Sample_Streaming {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("Sample_Streaming")
      .setMaster("local[4]")

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")

    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sc, Seconds(1))

    val props = new HashMap[String, Object]()


    val bootstrap_server_config = "127.0.0.100:9092"
    val zkQuorum = "127.0.0.101:2181"



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val TopicMap = Map("sampleTopic" -> 1)
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)

      val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
        .option("spark.mongodb.input.uri", "connectionURI")
        .option("spark.mongodb.input.collection", "schemaCollectionName")
        .load()

      val outSchema = schemaDf.schema
      var outDf = sqlContext.createDataFrame(sc.emptyRDD[Row], outSchema)

    KafkaDstream.foreachRDD(rdd => rdd.collect().map { x =>
      {
        val jsonInput: JValue = parse(x)


        /*Do all the transformations using Json libraries*/

        val json4s_transformed = "transformed json"

        val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
        val df = sqlContext.read.schema(outSchema).json(rdd)

        df.write.option("spark.mongodb.output.uri", "connectionURI")
                  .option("collection", "Collection")
                  .mode("append").format("com.mongodb.spark.sql").save()

        val producer = new KafkaProducer[String, String](props)
        val message = new ProducerRecord[String, String]("topic_name", null, "message_received")

        producer.send(message)
        producer.close()


      }

    }

    )

    // Run the streaming job
    ssc.start()
    ssc.awaitTermination()
  }

}

所以我们尝试了另一种方法,在 foreachRDD 之外创建生产者,并在整个批处理间隔内重用它(以下是代码)。这似乎有所帮助,因为我们并不是每次要发送确认时都创建生产者。但不知为何,当我们在spark UI上监控应用时,streaming应用的内存消耗在稳步增长,而之前并不是这样。我们尝试在 spark-submit 中使用 --num-executors 1 选项来限制由 yarn 启动的执行程序的数量。

    object Sample_Streaming {

    def main(args: Array[String]) {

    val sparkConf = new SparkConf().setAppName("Sample_Streaming")
      .setMaster("local[4]")

    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")

    val sqlContext = new SQLContext(sc)
    val ssc = new StreamingContext(sc, Seconds(1))

    val props = new HashMap[String, Object]()


    val bootstrap_server_config = "127.0.0.100:9092"
    val zkQuorum = "127.0.0.101:2181"



    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_server_config)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val TopicMap = Map("sampleTopic" -> 1)
    val KafkaDstream = KafkaUtils.createStream(ssc, zkQuorum, "group", TopicMap).map(_._2)

      val schemaDf = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource")
        .option("spark.mongodb.input.uri", "connectionURI")
        .option("spark.mongodb.input.collection", "schemaCollectionName")
        .load()

      val outSchema = schemaDf.schema
    val producer = new KafkaProducer[String, String](props)
    KafkaDstream.foreachRDD(rdd => 
          {

            rdd.collect().map ( x =>
            {

              val jsonInput: JValue = parse(x)


              /*Do all the transformations using Json libraries*/

              val json4s_transformed = "transformed json"

              val rdd = sc.parallelize(compact(render(json4s_transformed)) :: Nil)
              val df = sqlContext.read.schema(outSchema).json(rdd)

              df.write.option("spark.mongodb.output.uri", "connectionURI")
                        .option("collection", "Collection")
                        .mode("append").format("com.mongodb.spark.sql").save()


              val message = new ProducerRecord[String, String]("topic_name", null, "message_received")

              producer.send(message)
              producer.close()


            }

            )
        }

    )

    // Run the streaming job
    ssc.start()
    ssc.awaitTermination()
  }

}

我的问题是:

  1. 如何监控 spark 应用程序的内存消耗,目前我们每 5 分钟手动监控一次应用程序,直到它耗尽集群中的可用内存(每个 2 个节点 16GB)?

  2. 在使用 Spark 流和 kafka 时业界遵循的最佳实践是什么?

最佳答案

Kafka 是一个经纪人:它为生产者和消费者提供交付保证。在生产者和消费者之间实现“over the top”确认机制有点矫枉过正。确保生产者行为正确,消费者在出现故障时可以恢复,并确保端到端的交付。

关于这项工作,难怪它的性能很差:处理是按顺序进行的,逐个元素,直到写入外部数据库为止。这是明显错误,应该在尝试修复任何内存消耗问题之前解决。

这个过程可以像这样改进:

val producer = // create producer

val jsonDStream = kafkaDstream.transform{rdd => rdd.map{elem => 
    val json = parse(elem)
    render(doAllTransformations(json)) // output should be a String-formatted JSON object
  }
}

jsonDStream.foreachRDD{ rdd => 
  val df = sqlContext.read.schema(outSchema).json(rdd) // transform the complete collection, not element by element
  df.write.option("spark.mongodb.output.uri", "connectionURI") // write in bulk, not one by one
    .option("collection", "Collection")
    .mode("append").format("com.mongodb.spark.sql").save()
  val msg = //create message  
  producer.send(msg)
  producer.flush() // force send. *DO NOT Close* otherwise it will not be able to send any more messages
}

如果我们可以用 case class 实例替换所有以字符串为中心的 JSON 转换,则可以进一步改进此过程。

关于scala - 在 Spark Streaming 中重用 kafka producer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45663798/

相关文章:

apache-spark - 使用 Spark JDBC 时 DataFrame 列的自定义数据类型

go - 如何使用 Sarama Go Kafka Consumer 从最新的偏移量中消费

redis - 如何在 Kafka 中制作子主题

apache-kafka - -bash : kafka-server-start. sh:找不到命令

Scala 谓词组合

scala - 在 Scala 中编辑 csv 文件

java - 在 pyspark 上运行 sql 查询时出现 MetaException(message :java. lang.IllegalArgumentException : java.net.UnknownHostException)

apache-spark - ShuffledRDD、MapPartitionsRDD 和 ParallelCollectionRDD 之间有什么区别?

scala - Spark RDD 将一行数据映射为多行

java - 在服务接口(interface)上,公开 doSomethingOnUser(user : User) or doSomethingOnUser(userId: String)