scala - 将 mapWithState stateSnapShots 实体化到数据库,以便稍后恢复 Spark 流应用程序

标签 scala apache-spark spark-streaming

我有一个 Spark scala 流应用程序,它使用 mapWithState session 化来自 Kafka 的用户生成的事件。我想通过在维护的情况下启用暂停和恢复应用程序来完善设置。我已经将 kafka 偏移量信息写入数据库,因此在重新启动应用程序时,我可以在处理的最后一个偏移量处获取。但我也想保留状态信息。

所以我的目标是;

  • 在标识用户的 key 超时后实现 session 信息。
  • 当我正常关闭应用程序时实现 .stateSnapshot(),因此我可以在重新启动应用程序时使用该数据,将其作为参数提供给 StateSpec。

  • 1 正在工作,2 我有问题。

    为了完整起见,我还描述了 1,因为我一直对它的更好解决方案感兴趣:

    1) key 超时后实现 session 信息

    在我的 mapWithState 更新函数中,我有:
      if (state.isTimingOut()) {
        // if key is timing out.
        val output = (key, stateFilterable(isTimingOut = true
          , start = state.get().start
          , end = state.get().end
          , duration = state.get().duration
        ))
    

    那是 TimingOut bool 值,然后我将其用作:
    streamParsed
          .filter(a => a._2.isTimingOut)
          .foreachRDD(rdd =>
            rdd
              .map(stuff => Model(key = stuff._1,
                start = stuff._2.start,
                duration = stuff._2.duration)
              .saveToCassandra(keyspaceName, tableName) 
          )
    

    2) 使用优雅关闭实现 .stateSnapshot()

    物化快照信息不起作用。尝试了什么:
    // define a class Listener
    class Listener(ssc: StreamingContext, state: DStream[(String, stateFilterable)]) extends Runnable {
      def run {
        if( ssc == null )
          System.out.println("The spark context is null")
        else
          System.out.println("The spark context is fine!!!")
    
        var input = "continue"
        while( !input.equals("D")) {
          input = readLine("Press D to kill: ")
          System.out.println(input + " " + input.equals("D"))
        }
    
        System.out.println("Accessing snapshot and saving:")
        state.foreachRDD(rdd =>
          rdd
            .map(stuff => Model(key = stuff._1,
              start = stuff._2.start,
              duration = stuff._2.duration)
            .saveToCassandra("some_keyspace", "some_table") 
        )
    
        System.out.println("Stopping context!")
        ssc.stop(true, true)
        System.out.println("We have stopped!")
      }
    }
    
    // Inside the app object:
    val state = streamParsed.stateSnapshots()
    var listener = new Thread(new Listener(ssc, state))
    listener.start()
    

    所以完整的代码变成了:
    package main.scala.cassandra_sessionizing
    
    import java.text.SimpleDateFormat
    import java.util.Calendar
    import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream}
    import scala.collection.immutable.Set
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.Duration
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType, LongType, ArrayType, IntegerType}
    import _root_.kafka.serializer.StringDecoder
    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql.CassandraConnector
    
    case class userAction(datetimestamp: Double
                          , action_name: String
                          , user_key: String
                          , page_id: Integer
                         )
    
    case class actionTuple(pages: scala.collection.mutable.Set[Int]
                           , start: Double
                           , end: Double)
    
    case class stateFilterable(isTimingOut: Boolean
                               , start: Double
                               , end: Double
                               , duration: Int
                               , pages: Set[Int]
                               , events: Int
                              )
    
    case class Model(user_key: String
                     , start: Double
                     , duration: Int
                     , pages: Set[Int]
                     , events: Int
                    )
    
    class Listener(ssc: StreamingContext, state: DStream[(String, stateFilterable)]) extends Runnable {
      def run {
        var input = "continue"
        while( !input.equals("D")) {
          input = readLine("Press D to kill: ")
          System.out.println(input + " " + input.equals("D"))
        }
    
        // Accessing snapshot and saving:
        state.foreachRDD(rdd =>
          rdd
            .map(stuff => Model(user_key = stuff._1,
              start = stuff._2.start,
              duration = stuff._2.duration,
              pages = stuff._2.pages,
              events = stuff._2.events))
            .saveToCassandra("keyspace1", "snapshotstuff")
        )
    
        // Stopping context
        ssc.stop(true, true)
      }
    }
    
    object cassandra_sessionizing {
    
      // where we'll store the stuff in Cassandra
      val tableName = "sessionized_stuff"
      val keyspaceName = "keyspace1"
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("cassandra-sessionizing")
          .set("spark.cassandra.connection.host", "10.10.10.10")
          .set("spark.cassandra.auth.username", "keyspace1")
          .set("spark.cassandra.auth.password", "blabla")
    
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        // setup the cassandra connector and recreate the table we'll use for storing the user session data.
        val cc = CassandraConnector(conf)
        cc.withSessionDo { session =>
          session.execute(s"""DROP TABLE IF EXISTS $keyspaceName.$tableName;""")
          session.execute(
            s"""CREATE TABLE IF NOT EXISTS $keyspaceName.$tableName (
                  user_key TEXT
                , start DOUBLE
                , duration INT
                , pages SET<INT>
                , events INT
                , PRIMARY KEY(user_key, start)) WITH CLUSTERING ORDER BY (start DESC)
                ;""")
        }
    
        // setup the streaming context and make sure we can checkpoint, given we're using mapWithState.
        val ssc = new StreamingContext(sc, Seconds(60))
        ssc.checkpoint("hdfs:///user/keyspace1/streaming_stuff/")
    
        // Defining the stream connection to Kafka.
        val kafkaStream = {
          KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,
            Map("metadata.broker.list" -> "kafka1.prod.stuff.com:9092,kafka2.prod.stuff.com:9092"), Set("theTopic"))
        }
    
        // this schema definition is needed so the json string coming from Kafka can be parsed into a dataframe using spark read.json.
        // if an event does not conform to this structure, it will result in all null values, which are filtered out later.
        val struct = StructType(
          StructField("datetimestamp", DoubleType, nullable = true) ::
            StructField("sub_key", StructType(
              StructField("user_key", StringType, nullable = true) ::
              StructField("page_id", IntegerType, nullable = true) ::
              StructField("name", StringType, nullable = true) :: Nil), nullable = true) ::
        )
    
        /*
        this is the function needed to keep track of an user key's session.
        3 options:
        1) key already exists, and new values are coming in to be added to the state.
        2) key is new, so initialize the state with the incoming value
        3) key is timing out, so mark it with a boolean that can be used by filtering later on. Given the boolean, the data can be materialized to cassandra.
       */
    
        def trackStateFunc(batchTime: Time
                           , key: String 
                           , value: Option[actionTuple]
                           , state: State[stateFilterable])
        : Option[(String, stateFilterable)] = {
    
          // 1 : if key already exists and we have a new value for it
          if (state.exists() && value.orNull != null) {
            var current_set = state.getOption().get.pages
            var current_start = state.getOption().get.start
            var current_end = state.getOption().get.end
    
            if (value.get.pages != null) {
              current_set ++= value.get.pages
            }
    
            current_start = Array(current_start, value.get.start).min // the starting epoch is used to initialize the state, but maybe some earlier events are processed a bit later.
            current_end = Array(current_end, value.get.end).max // always update the end time of the session with new events coming in.
            val new_event_counter = state.getOption().get.events + 1
            val new_output = stateFilterable(isTimingOut = false
              , start = current_start
              , end = current_end
              , duration = (current_end - current_start).toInt
              , pages = current_set
              , events = new_event_counter)
    
            val output = (key, new_output)
            state.update(new_output)
            return Some(output)
          }
    
          // 2: if key does not exist and we have a new value for it
          else if (value.orNull != null) {
            var new_set: Set[Int] = Set()
            val current_value = value.get.pages
            if (current_value != null) {
              new_set ++= current_value
            }
    
            val event_counter = 1
            val current_start = value.get.start
            val current_end = value.get.end
    
            val new_output = stateFilterable(isTimingOut = false
              , start = current_start
              , end = current_end
              , duration = (current_end - current_start).toInt
              , pages = new_set
              , events = event_counter)
    
            val output = (key, new_output)
            state.update(new_output)
            return Some(output)
          }
    
          // 3: if key is timing out
          if (state.isTimingOut()) {
            val output = (key, stateFilterable(isTimingOut = true
              , start = state.get().start
              , end = state.get().end
              , duration = state.get().duration
              , pages = state.get().pages
              , events = state.get().events
            ))
            return Some(output)
          }
    
          // this part of the function should never be reached.
          throw new Error(s"Entered dead end with $key $value")
        }
    
        // defining the state specification used later on as a step in the stream pipeline.
        val stateSpec = StateSpec.function(trackStateFunc _)
          .numPartitions(16)
          .timeout(Seconds(4000))
    
        // RDD 1
        val streamParsedRaw = kafkaStream
          .map { case (k, v: String) => v } // key is empty, so get the value containing the json string.
          .transform { rdd =>
          val df = sqlContext.read.schema(struct).json(rdd) // apply schema defined above and parse the json into a dataframe,
            .selectExpr("datetimestamp" 
            , "action.name AS action_name"
            , "action.user_key"
            , "action.page_id"
          )
          df.as[userAction].rdd // transform dataframe into spark Dataset so we easily cast to the case class userAction.
        }
    
        val initialCount = actionTuple(pages = collection.mutable.Set(), start = 0.0, end = 0.0)
    
        val addToCounts = (left: actionTuple, ua: userAction) => {
          val current_start = ua.datetimestamp
          val current_end = ua.datetimestamp 
          if (ua.page_id != null) left.pages += ua.page_id
          actionTuple(left.pages, current_start, current_end)
        }
    
        val sumPartitionCounts = (p1: actionTuple, p2: actionTuple) => {
          val current_start = Array(p1.start, p2.start).min 
          val current_end = Array(p1.end, p2.end).max 
          actionTuple(p1.pages ++= p2.pages, current_start, current_end)
        }
    
        // RDD 2: add the mapWithState part.
        val streamParsed = streamParsedRaw
          .map(s => (s.user_key, s)) // create key value tuple so we can apply the mapWithState to the user_key.
          .transform(rdd => rdd.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)) // reduce to one row per user key for each batch.
          .mapWithState(stateSpec)
    
        // RDD 3: if the app is shutdown, this rdd should be materialized.
        val state = streamParsed.stateSnapshots()
        state.print(2)
    
        // RDD 4: Crucial: loop up sessions timing out, extract the fields that we want to keep and materialize in Cassandra.
        streamParsed
          .filter(a => a._2.isTimingOut)
          .foreachRDD(rdd =>
            rdd
              .map(stuff => Model(user_key = stuff._1,
                start = stuff._2.start,
                duration = stuff._2.duration,
                pages = stuff._2.pages,
                events = stuff._2.events))
              .saveToCassandra(keyspaceName, tableName)
          )
    
        // add a listener hook that we can use to gracefully shutdown the app and materialize the RDD containing the state snapshots.
        var listener = new Thread(new Listener(ssc, state))
    
        listener.start()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    

    但是当运行它时(所以启动应用程序,等待几分钟以建立一些状态信息,然后输入键“D”,我得到以下信息。所以退出后我不能用 dstream 做任何"new"的事情ssc. 我希望从 DStream RDD 移动到常规 RDD,退出流上下文,并通过保存普通 RDD 来结束。但不知道如何。希望有人能帮助!
    Exception in thread "Thread-52" java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after sta$
    ting a context is not supported
            at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
            at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
            at org.apache.spark.streaming.dstream.ForEachDStream.<init>(ForEachDStream.scala:34)
            at org.apache.spark.streaming.dstream.DStream.org$apache$spark$streaming$dstream$DStream$$foreachRDD(DStream.scala:687)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply$mcV$sp(DStream.scala:661)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:659)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:659)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
            at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
            at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
            at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
            at org.apache.spark.streaming.dstream.DStream.foreachRDD(DStream.scala:659)
            at main.scala.feaUS.Listener.run(feaUS.scala:119)
            at java.lang.Thread.run(Thread.java:745)
    

    最佳答案

    代码有两个主要更改,应该可以使它工作
    1>使用checkpointed目录启动spark流上下文。

    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
          () => createContext(checkpointDirectory));
    

    其中 createContext 方法具有创建和定义新流的逻辑,并将检查点日期存储在 checkpointDirectory 中。

    2> sql上下文需要以稍微不同的方式构建。
        val streamParsedRaw = kafkaStream
      .map { case (k, v: String) => v } // key is empty, so get the value containing the json string.
      .map(s => s.replaceAll("""(\"hotel_id\")\:\"([0-9]+)\"""", "\"hotel_id\":$2")) // some events contain the hotel_id in quotes, making it a string. remove these quotes.
      .transform { rdd =>
      val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
      import sqlContext.implicits._
      val df = sqlContext.read.schema(struct).json(rdd) // apply schema defined above and parse the json into a dataframe,
        .selectExpr("__created_epoch__ AS created_epoch" // the parsed json dataframe needs a bit of type cleaning and name changing
    

    关于scala - 将 mapWithState stateSnapShots 实体化到数据库,以便稍后恢复 Spark 流应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38541958/

    相关文章:

    scala - Scala中列表的模式匹配结束/中间

    Scala - 可变集合中的协变类型

    hadoop - 为什么选择 Hadoop 或 Spark?有 Elasticsearch

    java - Spark 将 JavaPairDStream 流式传输到文本文件

    java - 将 Spring 与 Spark 一起使用

    algorithm - 如何使用第一个 map 的值检索嵌套 map 的值?

    scala - 通过键写入多个输出 Scalding Hadoop,一个 MapReduce 作业

    apache-spark - Apache Spark RDD 缓存和沿袭混淆

    apache-spark - spark RDD容错误区

    python - 如何将 Spark Streaming 数据转换为 Spark DataFrame