apache-flink - NOT followBy 的 Apache Flink CEP 模式操作

标签 apache-flink flink-cep

我有一个场景,如果第二个事件在 x 秒内没有跟随第一个事件,我必须更改状态。例如用户未在 100 分钟内注销,认为他处于无效状态。如何使用当前的模式操作来设计?

最佳答案

由于这已经实现,我想为那些来这里寻找答案的人回答这个问题。

从 Flink 1.0.0 开始,这可以通过处理超时模式来完成,例如,如果您的 CEP 模式是这样的:

示例部分来自 Flink Website (1.2 和 1.3 之间有一些重大变化,请相应调整您的代码,此答案侧重于 1.3)

Pattern description: - Get first event of type "error", followed by a second event event of type "critical" within 10 seconds


Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.next("middle").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("error");
    }
}).followedBy("end").where(new SimpleCondition<Event>() {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.getName().equals("critical");
    }
}).within(Time.seconds(10));

PatternStream<BAMEvent> patternStream = CEP.pattern(inputStream, pattern)

DataStream<Either<String, String>> result = patternStream.select(new PatternTimeoutFunction<Event, String>() {
  @Override
  public String timeout(Map<String, List<Event>> map, long l) throws Exception {
    return map.toString() +" @ "+ l;
  }
}, new PatternSelectFunction<Event, String>() {

  @Override
  public String select(Map<String, List<Event>> map) throws Exception {
    return map.toString();
  }
});

对于这种情况,如果用户在 100 分钟后仍未注销,则由于相应的事件不会到达,这将导致模式超时,部分事件(启动事件)将在 PatternTimeoutFunction 中捕获。

关于apache-flink - NOT followBy 的 Apache Flink CEP 模式操作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36573715/

相关文章:

spring-boot - 集成 - Apache Flink + Spring Boot

apache-flink - 如何调试Flink中的可序列化异常?

java - 如何使用 flink 运行多个复杂规则

apache-flink - Flink 动态伸缩 1.5

具有特权的JVM中的Java文件setWritable

apache-flink - Flink 中并行度的增加会降低/分散整体吞吐量

amazon-dynamodb - 从flink集群外部访问flink状态有哪些方式?

apache-flink - Flink Process Function 未将数据返回到 Sideoutputstream

apache-flink - 在 Apache Flink 服务器上哪里可以找到我使用 Apache Flink 仪表板提交的 jar

java - Flink奇怪的 "Cannot Serialize operator object class ...CoBroadcastWithNonKeyedOperator"错误