scala - Apache Flink:使用keyBy/connect维护流中的消息输入顺序

标签 scala join stream apache-flink rule-engine

介绍
我正在使用Apache Flink构建一个相当复杂的数据流网络。这个想法是用flink实现一个规则引擎。
作为应用程序的基本描述,这是它应该如何工作的:
数据由kafka使用者源接收,并通过许多数据流进行处理,直到最终将其发送到kafka生产者接收器为止。传入数据包含带有逻辑键(“object-id”)的对象,传入消息可能引用相同的object-id。对于每个给定的对象ID,必须在整个应用程序中保留其传入消息的顺序。整体消息的顺序可以是任意的。
这意味着必须按顺序处理object1的消息a,b和c,但是object2的消息x可以在a1 / b1 / c1之间,之前或之后进行处理。
就我目前的理解而言,这意味着我必须keyBy(_.objectID),以便按到达的顺序处理同一对象的消息。
目前的方法
为了实现实际的规则引擎,创建了一个流网络。这个想法如下:

  • 每个规则将具有1-n个条件
  • 为每个规则的每个条件创建一个带有.filter(_.matches(rule.condition))的原始流的子流
  • 通过使用substream1.connect(substream2).flatMap(new CombineFunction[MyObject](...))
  • 组合对应于同一规则的所有子流
  • connect只能加入2个流,因此具有3个条件的规则将导致随后的2次加入
  • 使用相同条件的
  • 规则将重复使用第二步中创建的相同子流。

  • 这将导致n个加入的流,其中n对应于规则数。加入的流将附加一个map函数,该函数会标记该消息,以便我们知道匹配的规则。
    每个加入/结果流都可以独立于其他结果将其结果(“规则xyz匹配”)发布到kafka生产者,因此在这一点上,我可以将接收器附加到流中。
    连接详细信息
    由于两个流(“条件”-子流)的.connect仅必须传递一条消息,因此,如果两个流(^ =两个条件都匹配)都收到了该消息,那么我需要一个具有键控状态的RichCoFlatMapFunction,它可以处理“仅在另一端已经收到时才通过”。
    然而,的问题是,流由 object-id 设置了键。那么,如果同一对象的2条消息通过网络运行并到达.connect().map(new RichCoFlatMapFunction...),会发生什么情况?这将导致错误的输出。
    在进入网络时,我需要为每个传入消息分配一个唯一的ID(UUID),因此我可以在.connect().map()..连接中使用此密钥(而不是object-id)。
    但是同时,我需要使用object-id对流进行键控,以便按顺序处理相同对象的消息。该怎么办?
    为了解决这个问题,我将输入流保留为keyBy(_.objectID),但流连接中的RichCoFlatMapFunction不再使用键控状态。取而代之的是,我使用一个简单的运算符状态,该状态保留了已传递对象的映射,但是仅通过手动键/值查找即可实现相同的逻辑。
    这似乎可行,但是我不知道这是否会引入更多问题。
    可视化
    flink GUI将渲染此图像,以显示14条规则的列表,总共包含23个条件(某些规则只有一个条件):
    Network of nodes
    enter image description here

    使用以下代码可实现网络的创建:
    val streamCache = mutable.Map[Int,DataStream[WorkingMemory]]()
    val outputNodesCache = ListBuffer[DataStream[WorkingMemory]]()
    
    if (rules.isEmpty)
      return
    
    // create partial streams for all conditions (first level)
    // cache the sub-stream with the hashcode of its condition as key (for re-use)
    
    for (rule <- rules if rule.checks.nonEmpty ;
         cond <- rule.checks if !streamCache.contains(cond.hashCode()))
      streamCache += cond.hashCode -> sourceStream.filter(cond.matches _)
    
    // create joined streams for combined conditions (sub-levels)
    
    for (rule <- rules if rule.checks.nonEmpty)
    {
      val ruleName = rule.ruleID
    
      // for each rule, starting with the rule with the least conditions ...
    
      if (rule.checks.size == 1)
      {
        // ... create exit node if single-condition rule
        // each exit node applies the rule-name to the objects set of matched rules.
    
        outputNodesCache += streamCache(rule.checks.head.hashCode).map(obj => { obj.matchedRule = ListBuffer((ruleName, rule.objectType.mkString(":"), rule.statement)) ; obj })
      }
      else
      {
        // ... iterate all conditions, and join nodes into full rule-path (reusing existing intermediate paths)
    
        var sourceStream:DataStream[WorkingMemory] = streamCache(rule.checks.head.hashCode)
        var idString = rule.checks.head.idString
    
        for (i <- rule.checks.indices)
        {
          if (i == rule.checks.size-1)
          {
            // reached last condition of rule, create exit-node
            // each exit node applies the rule-name to the objects set of matched rules.
    
            val rn = ruleName
            val objectType = rule.objectType.mkString(":")
            val statement = rule.statement
    
            outputNodesCache += sourceStream.map(obj => { obj.matchedRule = ListBuffer((rn, objectType, statement)) ; obj })
          }
          else
          {
            // intermediate condition, create normal intermediate node
    
            val there = rule.checks(i+1)
            val connectStream = streamCache(there.hashCode)
    
            idString += (":" + there.idString)
    
            // try to re-use existing tree-segments
    
            if (streamCache.contains(idString.hashCode))
              sourceStream = streamCache(idString.hashCode)
            else
              sourceStream = sourceStream.connect(connectStream).flatMap(new StatefulCombineFunction(idString))
          }
        }
      }
    }
    
    // connect each output-node to the sink
    
    for (stream <- outputNodesCache)
    {
      stream.map(wm => RuleEvent.toXml(wm, wm.matchedRule.headOption)).addSink(sink)
    }
    
    上一片段中使用的StatefulCombineFunction:
    class StatefulCombineFunction(id:String) extends RichCoFlatMapFunction[WorkingMemory, WorkingMemory, WorkingMemory] with CheckpointedFunction
    {
      @transient
      private var leftState:ListState[(String, WorkingMemory)] = _
      private var rightState:ListState[(String, WorkingMemory)] = _
      private var bufferedLeft = ListBuffer[(String, WorkingMemory)]()
      private var bufferedRight = ListBuffer[(String, WorkingMemory)]()
    
      override def flatMap1(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedLeft, bufferedRight, xmlObject, out, "left")
      override def flatMap2(xmlObject: WorkingMemory, out: Collector[WorkingMemory]): Unit = combine(bufferedRight, bufferedLeft, xmlObject, out, "right")
    
      def combine(leftState: ListBuffer[(String, WorkingMemory)], rightState: ListBuffer[(String, WorkingMemory)], xmlObject:WorkingMemory, out: Collector[WorkingMemory], side:String): Unit =
      {
        val otherIdx:Int = leftState.indexWhere(_._1 == xmlObject.uuid)
    
        if (otherIdx > -1)
        {
          out.collect(leftState(otherIdx)._2)
          leftState.remove(otherIdx)
        }
        else
        {
          rightState += ((xmlObject.uuid, xmlObject))
        }
      }
    
      override def initializeState(context:FunctionInitializationContext): Unit = ???
      override def snapshotState(context:FunctionSnapshotContext):Unit = ???
    }
    
    我知道缺少清除操作符状态中的部分匹配项(生存时间)的方法,但这对于当前的开发状态并不重要,将在以后添加。
    背景资料
    该应用程序应使用flink(https://en.wikipedia.org/wiki/Rete_algorithm)实现用于规则匹配的rete-algorithm。
    一种不同的方法是只为每个传入消息循环所有规则,并附加结果。对于使用flink的这种方法,我有一个有效的实现,因此请不要将此作为解决方案。
    问题
    问题是,应用程序在对象ID级别上弄乱了传入消息的顺序。也就是说,它没有达到我在简介中所要求的。对于每个对象ID,传入的消息必须保持顺序。但这种情况并非如此。
    我不知道顺序在代码的哪一点弄乱了,或者这些操作如何在线程之间分配,所以我不知道如何解决这个问题。

    最佳答案

    一些评论...

  • 我想您已经检查了Flink的CEP支持,尤其是Handling Lateness in Event Time。关键概念是您可以依靠事件时间(而不是处理时间)来帮助安排事件,但是您始终必须决定您愿意忍受的最大延迟量是多少(延迟可能是由于源,以及工作流程中发生的所有处理)。
  • 从您提供的Flink作业图中,您似乎正在通过哈希对传入数据进行分区,但是每个规则都需要获取每个传入数据,对吗?因此,在这种情况下,您需要广播。
  • 关于scala - Apache Flink:使用keyBy/connect维护流中的消息输入顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53850941/

    相关文章:

    MySQL 多个左连接问题

    MySQL连接,连接表中的空行

    java - Base64InputStream 中只有一个字节可用

    scala selenium dsl 页面对象

    Scala - 如果路径相同,则将多个列表合并为一个列表,直到路径发生变化。 (删除列表中重复的子列表)

    r - 使用精确匹配和模糊匹配在 R 中连接两个大型数据集

    stream - 卡夫卡流 - 如何分组两次?

    java - EOFException 的奇怪行为

    algorithm - 对列表中的相邻元素进行分组

    scala - Scala映射到HashMap