java - FLINK CEP (Java 8) - 通过匹配模式持久化 "identity"

标签 java apache-flink flink-cep

我正在尝试使用 FLINK-CEP 来测量市场中的出价从 BidState.OPENBidState.Closed 所需的时间。我正在接收带有 ID 和状态的出价数据流,就目前情况而言,我将所有“开放”出价与所有“已关闭”出价进行匹配。

我在 patternStream.process 中有一个条件,它只允许将具有相同 ID 的开盘价和收盘价配对,正如它们应该的那样。但这感觉不对,因为匹配的数量以这种方式增长得非常快,而且我感觉这可以通过模式来完成。那么,有没有办法确保“开始”和“结束”对象具有相同的 ID?

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
//Is it possible to make sure that start.BidID == end.BidID in the pattern?
Pattern<BidEvent, ?> pattern = Pattern.<BidEvent>begin("start", skipStrategy).where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) {
                return value.getState() == BidState.OPENED;
            }
        }).followedByAny("end").where(
        new SimpleCondition<BidEvent>() {
            @Override
            public boolean filter(BidEvent value) throws Exception {
                return value.getState() == BidState.CLOSED; // && value.getBidID == start.getBidID?
            }
        }).within(timeout);

PatternStream<BidEvent> patternStream = CEP.pattern(BidEventDataStream, pattern);

patternStream.process(new PatternProcessFunction<BidEvent, MatchingDuration>() {
    @Override
    public void processMatch(Map<String
            , List<BidEvent>> map
            , Context context
            , Collector<MatchingDuration> collector) {

        BidEvent start = map.get("start").get(0);
        BidEvent end = map.get("end").get(0);
        if (start.getBidId() == end.getBidId()){ // Make sure opening and closing bid is the same. Can this be done in the pattern?
            collector.collect(new MatchingDuration(start.getBidId(), (end.getTimestamp() - start.getTimestamp())));
        }
    }
}).addSink(matchingDurationSinkFunction);

最佳答案

我想出了如何获得我想要的行为:BidEventDataStream 必须键入,以便对具有相同属性的对象进行模式匹配 key 。问题中的代码无需更改,但必须编辑 BidEventDataStream 才能捕获 BidEvent.getBidId():

BidEventDataStream.keyBy(new KeySelector<BidEvent, Long>() {
                    @Override
                    public Long getKey(BidEventvalue) {
                        return value.getBidId();
                    }
                })

关于java - FLINK CEP (Java 8) - 通过匹配模式持久化 "identity",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56348581/

相关文章:

Maven shade-plugin重定位不更新资源文件中的条目

apache-flink - 如何迭代 Flink DataStream 中的每条消息?

java - 如何向flink CEP数据流添加新事件?

java - 将自定义源的数据连续写入flink

java - Android 视频播放器中的方向更改

用于生成和使用生成的决策树的 Java 库

java - ./* 和 ./* 之间有什么区别? :./* 在 java 类路径中

apache-flink - 重置可查询状态

java - Flink CEP 不在事件时间工作,但在处理时间工作

java - Swing中关于 "virtual trees"(自定义TreeModel)的问题