java - Apache Flink CEP 如何检测事件是否在 x 秒内没有发生?

标签 java apache-flink flink-streaming flink-cep

例如,A 应该在 10 秒内跟随 B。我知道如何跟踪此事件是否确实发生(.next、.within),但如果 B 从未在窗口内发生,我想发送警报。

    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // checkpointing is required for exactly-once or at-least-once guarantees
//      env.enableCheckpointing(1000);

        final RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
            .setHost("localhost")
            .setPort(5672)
            .setVirtualHost("/")
            .setUserName("guest")
            .setPassword("guest")
            .build();

        final DataStream<String> inputStream = env
            .addSource(new RMQSource<String>(
                connectionConfig,               // config for the RabbitMQ connection
                "cep",                          // name of the RabbitMQ queue to consume
                true,                           // use correlation ids; can be false if only at-least-once is required
                new SimpleStringSchema()))      // deserialization schema to turn messages into Java objects
            .setParallelism(1);                 // non-parallel source is only required for exactly-once

        inputStream.print();

        Pattern<String, ?> simplePattern =
                Pattern.<String>begin("start")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("A");
                        }
                    })
                    .next("end")
                    .where(new SimpleCondition<String>() {
                        @Override
                        public boolean filter(String event) {
                            return event.equals("B");
                        }
                    });

        PatternStream<String> timedOutPatternStream = CEP.pattern(inputStream, simplePattern.within(Time.seconds(10)));
        OutputTag<String> timedout = new OutputTag<String>("timedout"){};
        SingleOutputStreamOperator<String> timedOutNotificationsStream = timedOutPatternStream.flatSelect(
            timedout,
            new TimedOut<String>(),
            new FlatSelectNothing<String>()
        );
        timedOutNotificationsStream.getSideOutput(timedout).print();

        env.execute("mynotification");
    }

public static class TimedOut<String> implements PatternFlatTimeoutFunction<String, String> {
    @Override
    public void timeout(Map<java.lang.String, List<String>> pattern, long timeoutTimestamp, Collector<String> out) throws Exception {
        out.collect((String) "LATE!");
    }
}

public static class FlatSelectNothing<T> implements PatternFlatSelectFunction<T, T> {
    @Override
    public void flatSelect(Map<String, List<T>> pattern, Collector<T> collector) {}
}

实际行为:

publish "A"
(wait 5 seconds)
publish "B"
=> (no alert)

publish "A"
(wait 10 seconds)
=> (no alert, but should be)

publish "A"
(wait 10 seconds)
publish "B"
=> "LATE!"

预期行为:

publish "A"
(wait 10 seconds)
=> "LATE!"

最佳答案

您可以通过超时模式来做到这一点。您可以指定模式,例如 A 在 10 秒内跟随 B 并检查是否有超时的模式,这意味着只有 A。您可以检查文档以了解超时模式 here

有关完整示例,您可以引用此 training或直接解决excercise .


编辑: 现在(flink <1.5),处理时间修剪仅在传入元素上完成。因此,不幸的是,在超时之后,必须至少有一个事件(无论是否匹配都无关)会触发超时。可以通过此 jira ticket 跟踪改进它的努力。

关于java - Apache Flink CEP 如何检测事件是否在 x 秒内没有发生?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50356981/

相关文章:

Java 8 内部包含列表的对象组列表

java - Apache Flink 中通用模式转换的 InvalidTypesException

python - KafkaRecord 不能转换为 [B

apache-flink - 选择所有字段为json字符串作为Flink SQL中的新字段

apache-kafka - Flink 偏移量在手动重置 kafka 偏移量时进入不一致状态

java - 如何定义具有行时间属性的 apache flink 表

java - "cannot find symbol"尝试编译实现接口(interface)的 java 文件时

java - 从 Java 6 升级到 Java 7 时 native 堆上出现 OutOfMemoryError

java - Flink JobExecutionException : akka. client.timeout

java - 格式化 TreeItem 文本颜色?