java - Flink CEP 不在事件时间工作,但在处理时间工作

标签 java scala apache-flink complex-event-processing flink-cep

当我使用 Flink CEP 代码来处理时间时(默认配置),我能够获得所需的模式匹配,但在将环境配置为事件时间时,我无法获得任何模式匹配。

 def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.enableCheckpointing(3000) // checkpoint every 3000 msec
     val lines = env.addSource(consumerKafkaSource.consume("bank_transaction_2", "192.168.2.201:9092", "192.168.2.201:2181", "http://192.168.2.201:8081"))

  val eventdate = ExtractAndAssignEventTime.assign(lines, "unix", "datetime", 3) //Extracting date time here

    val event = eventdate.keyBy(v => v.get("customer_id").toString.toInt)
   val pattern1 = Pattern.begin[GenericRecord]("start").where(v=>v.get("state").toString=="FAILED").next("d").where(v=>v.get("state").toString=="FAILED")
      val patternStream = CEP.pattern(event, pattern1)
    val warnID = patternStream.sideOutputLateData(latedata).select(value =>  {
      val v = value.mapValues(c => c.toList.toString)
      Json(DefaultFormats).write(v).replace("\\\"", "\"")
        //.replace("List(","{").replace(")","}")
    })
    val latedatastream = warnID.getSideOutput(latedata)
    latedatastream.print("late_data")


    warnID.print("warning")
    event.print("event")

时间戳提取代码

object ExtractAndAssignEventTime {
  def assign(stream:DataStream[GenericRecord],timeFormat:String,timeColumn:String,OutofOrderTime:Int ):DataStream[GenericRecord] ={
    if(!(timeFormat.equalsIgnoreCase("Unix"))){
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(3)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          new java.text.SimpleDateFormat(timeFormat).parse(t.get(timeColumn).toString).getTime
        }
      })
      EventTimeStream
    }
    else{
      val EventTimeStream=stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[GenericRecord](Time.seconds(OutofOrderTime)) {
        override def extractTimestamp(t: GenericRecord): Long = {
          (t.get(timeColumn).toString.toLong)
        }
      })
      EventTimeStream
    }
  }

请帮我解决这个问题。提前致谢。!

最佳答案

由于您使用的是AssingerWithPeriodicWatermark,您还需要设置setAutowatermarkInterval,以便Flink使用此间隔来生成水印。

您可以通过调用env.getConfig.setAutoWatermarkInterval([interval])来完成此操作。

对于事件时间CEP,基于水印,因此如果不生成水印,则基本上不会有输出。

关于java - Flink CEP 不在事件时间工作,但在处理时间工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60335612/

相关文章:

java - 验证 Java 中的 url 列表到 POJO?

java - Android 单元测试和类成员可见性

scala - 如何从 Spark SQL DataFrame 中的 MapType 列获取键和值

scala - 多个应用程序被提交到 spark Cluster 并一直等待,然后退出 withError

Scala:从特征实例化时使用 def 还是 val?

java - 使用 Flink 1.2 从 Avro 文件读取数据

elasticsearch - 使用Flink Rich InputFormat创建Elasticsearch的输入格式

java - 如何在 Wildfly 中配置 Jackson?

scala - Flink 自定义分区功能

java - 将条件作为参数传递给迭代器