apache-flink - 连接的 flink 流中的背压

标签 apache-flink

当我将 ConnectedStreams 作为计算图的一部分时,我正在试验如何正确传播背压。问题是:我有两个来源,一个比另一个更快地摄取数据,认为我们想要重放一些数据,一个来源有我们用来丰富另一个来源的罕见事件。这两个源然后连接在一个流中,期望它们至少在某种程度上同步,以某种方式将它们合并在一起(制作元组,丰富,...)并返回结果。

使用单个输入流很容易实现背压,您只需在 processElement 函数中花费很长时间。对于connectedstreams,我最初的想法是在每个processFunctions 中都有一些逻辑来等待另一个流 catch 。例如,我可以有一个时间跨度受限的缓冲区(足够大的跨度以适合水印),并且该函数不会接受会使该跨度超过阈值的事件。例如:

leftLock.aquire { nonEmptySignal =>
  while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
    println("WAITING")
    nonEmptySignal.await()
  }

  queueOp { queue =>
    println(s"Left Event $value recieved ${Thread.currentThread()}")
    queue.add(Left(value))
  }
  ctx.timerService().registerEventTimeTimer(value.ts)
}

我的示例的完整代码如下(假设从两个不同的线程访问,它用两个锁编写,事实并非如此 - 我认为):
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.locks.{Condition, ReentrantLock}

import scala.collection.JavaConverters._
import com.google.common.collect.MinMaxPriorityQueue
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment
import org.apache.flink.streaming.api.functions.co.CoProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.util.Collector

import scala.collection.mutable
import scala.concurrent.duration._

trait Timestamped {
  val ts: Long
}

case class StateObject(ts: Long, state: String) extends Timestamped

case class DataObject(ts: Long, data: String) extends Timestamped

case class StatefulDataObject(ts: Long, state: Option[String], data: String) extends Timestamped

class DataSource[A](factory: Long => A, rate: Int, speedUpFactor: Long = 0) extends RichSourceFunction[A] {

  private val max = new AtomicLong()
  private val isRunning = new AtomicBoolean(false)
  private val speedUp = new AtomicLong(0)
  private val WatermarkDelay = 5 seconds

  override def cancel(): Unit = {
    isRunning.set(false)
  }

  override def run(ctx: SourceFunction.SourceContext[A]): Unit = {
    isRunning.set(true)
    while (isRunning.get()) {
      val time = System.currentTimeMillis() + speedUp.addAndGet(speedUpFactor)
      val event = factory(time)
      ctx.collectWithTimestamp(event, time)
      println(s"Event $event sourced $speedUpFactor")

      val watermark = time - WatermarkDelay.toMillis
      if (max.get() < watermark) {
        ctx.emitWatermark(new Watermark(time - WatermarkDelay.toMillis))
        max.set(watermark)
      }
      Thread.sleep(rate)
    }
  }
}

class ConditionalOperator {
  private val lock = new ReentrantLock()
  private val signal: Condition = lock.newCondition()

  def aquire[B](func: Condition => B): B = {
    lock.lock()
    try {
      func(signal)
    } finally {
      lock.unlock()
    }
  }
}

class BlockingCoProcessFunction(capacity: FiniteDuration = 20 seconds)
  extends CoProcessFunction[StateObject, DataObject, StatefulDataObject] {

  private type MergedType = Either[StateObject, DataObject]
  private lazy val leftLock = new ConditionalOperator()
  private lazy val rightLock = new ConditionalOperator()
  private var queueState: ValueState[MinMaxPriorityQueue[MergedType]] = _
  private var dataState: ValueState[StateObject] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    queueState = getRuntimeContext.getState(new ValueStateDescriptor[MinMaxPriorityQueue[MergedType]](
      "event-queue",
      TypeInformation.of(new TypeHint[MinMaxPriorityQueue[MergedType]]() {})
    ))

    dataState = getRuntimeContext.getState(new ValueStateDescriptor[StateObject](
      "event-state",
      TypeInformation.of(new TypeHint[StateObject]() {})
    ))
  }

  override def processElement1(value: StateObject,
                               ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#Context,
                               out: Collector[StatefulDataObject]): Unit = {
    leftLock.aquire { nonEmptySignal =>
      while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
        println("WAITING")
        nonEmptySignal.await()
      }

      queueOp { queue =>
        println(s"Left Event $value recieved ${Thread.currentThread()}")
        queue.add(Left(value))
      }
      ctx.timerService().registerEventTimeTimer(value.ts)
    }
  }

  override def processElement2(value: DataObject,
                               ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#Context,
                               out: Collector[StatefulDataObject]): Unit = {
    rightLock.aquire { nonEmptySignal =>
      while (queueSpan() > capacity.toMillis && lastTs() < ctx.timestamp()) {
        println("WAITING")
        nonEmptySignal.await()
      }

      queueOp { queue =>
        println(s"Right Event $value recieved ${Thread.currentThread()}")
        queue.add(Right(value))
      }
      ctx.timerService().registerEventTimeTimer(value.ts)
    }
  }

  override def onTimer(timestamp: Long,
                       ctx: CoProcessFunction[StateObject, DataObject, StatefulDataObject]#OnTimerContext,
                       out: Collector[StatefulDataObject]): Unit = {
    println(s"Watermarked $timestamp")
    leftLock.aquire { leftSignal =>
      rightLock.aquire { rightSignal =>
        queueOp { queue =>
          while (Option(queue.peekFirst()).exists(x => timestampOf(x) <= timestamp)) {
            queue.poll() match {
              case Left(state) =>
                dataState.update(state)
                leftSignal.signal()
              case Right(event) =>
                println(s"Event $event emitted ${Thread.currentThread()}")
                out.collect(
                  StatefulDataObject(
                    event.ts,
                    Option(dataState.value()).map(_.state),
                    event.data
                  )
                )
                rightSignal.signal()
            }
          }
        }
      }
    }
  }

  private def queueOp[B](func: MinMaxPriorityQueue[MergedType] => B): B = queueState.synchronized {
    val queue = Option(queueState.value()).
      getOrElse(
        MinMaxPriorityQueue.
          orderedBy(Ordering.by((x: MergedType) => timestampOf(x))).create[MergedType]()
      )
    val result = func(queue)
    queueState.update(queue)
    result
  }

  private def timestampOf(data: MergedType): Long = data match {
    case Left(y) =>
      y.ts
    case Right(y) =>
      y.ts
  }

  private def queueSpan(): Long = {
    queueOp { queue =>
      val firstTs = Option(queue.peekFirst()).map(timestampOf).getOrElse(Long.MaxValue)
      val lastTs = Option(queue.peekLast()).map(timestampOf).getOrElse(Long.MinValue)
      println(s"Span: $firstTs - $lastTs = ${lastTs - firstTs}")
      lastTs - firstTs
    }
  }

  private def lastTs(): Long = {
    queueOp { queue =>
      Option(queue.peekLast()).map(timestampOf).getOrElse(Long.MinValue)
    }
  }
}

object BackpressureTest {

  var data = new mutable.ArrayBuffer[DataObject]()

  def main(args: Array[String]): Unit = {
    val streamConfig = new Configuration()
    val env = new StreamExecutionEnvironment(new LocalStreamEnvironment(streamConfig))

    env.getConfig.disableSysoutLogging()
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stateSource = env.addSource(new DataSource(ts => StateObject(ts, ts.toString), 1000))
    val dataSource = env.addSource(new DataSource(ts => DataObject(ts, ts.toString), 100, 100))

    stateSource.
      connect(dataSource).
      keyBy(_ => "", _ => "").
      process(new BlockingCoProcessFunction()).
      print()

    env.execute()
  }
}

连接流的问题是当它的流太远时,您似乎不能简单地阻塞其中一个 processFunction,因为它也会阻塞另一个 processFunction。另一方面,如果我只是简单地接受此作业中的所有事件,最终进程函数将耗尽内存。因为它会缓冲前面的整个流。

所以我的问题是:是否可以将背压分别传播到 ConnectedStreams 中的每个流中,如果可以,如何传播?或者,有没有其他好的方法来处理这个问题?可能所有来源都以某种方式进行通信以使它们大部分时间保持在同一事件时间?

最佳答案

从我阅读 StreamTwoInputProcessor 中的代码来看,在我看来 processInput() 方法负责实现相关策略。也许可以实现一种变体,从具有较低水印的任何流读取,只要它有未读输入。但是,不确定总体上会产生什么影响。

关于apache-flink - 连接的 flink 流中的背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48807836/

相关文章:

apache-flink - 命名要在 Flink 执行计划 UI 中显示的运算符、源、接收器和模式

apache-flink - 如何在 Apache Flink 中并行写入接收器

java - Flink DataStream如何将自定义的POJO合并到另一个DataStream中

apache-flink - Apache Flink : What does "Class X does not contain a setter for field Y" mean?

java - Apache Flink 表查询结果作为字符串值

apache-flink - Apache 弗林克 : REST API retrieve metric values

kubernetes - Flink、Kubernetes 和 Linkerd

apache-kafka - Flink 表 API : GROUP BY in SQL Execution throws org. apache.flink.table.api.TableException

java - Apache Flink IncationTargetException : Job execution failed. 连接超时

hadoop - Flink 在 YARN : Amazon S3 wrongly used instead of HDFS 上