java - 使用 Apache Flink 进行动态模式评估

标签 java apache-flink flink-cep

我是 Apache Flink 的新手,我正在尝试使用 flink CEP 动态评估流中的模式。我正在尝试查找执行以下操作登录、addtocart 和注销的用户,并且它能够检测到该模式,但如果我定义登录、注销等多个模式,它无法检测到该模式

下面是我的代码

Action 类

public class Action {

    public int userID;
    public String action;

    public Action() {
    }

    public Action(int userID, String action) {
        this.userID = userID;
        this.action = action;
    }

    public int getUserID() {
        return userID;
    }

    public void setUserID(int userID) {
        this.userID = userID;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    @Override
    public String toString() {
        return "Action [userID=" + userID + ", action=" + action + "]";
    }

}

模式类

public class Pattern {

    public String firstAction;
    public String secondAction;
    public String thirdAction;

    public Pattern() {

    }

    public Pattern(String firstAction, String secondAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
    }

    public Pattern(String firstAction, String secondAction, String thirdAction) {
        this.firstAction = firstAction;
        this.secondAction = secondAction;
        this.thirdAction = thirdAction;
    }

    public String getFirstAction() {
        return firstAction;
    }

    public void setFirstAction(String firstAction) {
        this.firstAction = firstAction;
    }

    public String getSecondAction() {
        return secondAction;
    }

    public void setSecondAction(String secondAction) {
        this.secondAction = secondAction;
    }

    public String getThirdAction() {
        return thirdAction;
    }

    public void setThirdAction(String thirdAction) {
        this.thirdAction = thirdAction;
    }

    @Override
    public String toString() {
        return "Pattern [firstAction=" + firstAction + ", secondAction=" + secondAction + ", thirdAction=" + thirdAction
                + "]";
    }



}

主类

public class CEPBroadcast {

    public static class PatternEvaluator
            extends KeyedBroadcastProcessFunction<Integer, Action, Pattern, Tuple2<Integer, Pattern>> {

        /**
         * 
         */
        private static final long serialVersionUID = 1L;

        ValueState<String> prevActionState;

        MapStateDescriptor<Void, Pattern> patternDesc;

        @Override
        public void open(Configuration conf) throws IOException {
            prevActionState = getRuntimeContext().getState(new ValueStateDescriptor<>("lastAction", Types.STRING));
            patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {

            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
            bcState.put(null, pattern);
            ;

        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<Integer, Pattern>> out)
                throws Exception {
            Pattern pattern = ctx.getBroadcastState(this.patternDesc).get(null);
            String prevAction = prevActionState.value();

            if (pattern != null && prevAction != null) {

                if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(prevAction)
                        && pattern.thirdAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                } else if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }

            prevActionState.update(action.action);

        }

    }

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

        DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

        KeyedStream<Action, Integer> actionByUser = actions
                .keyBy((KeySelector<Action, Integer>) action -> action.userID);

        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID,
                Types.POJO(Pattern.class));

        BroadcastStream<Pattern> bcedPattern = pattern.broadcast(bcStateDescriptor);

        DataStream<Tuple2<Integer, Pattern>> matches = actionByUser.connect(bcedPattern)
                .process(new PatternEvaluator());

        matches.flatMap(new FlatMapFunction<Tuple2<Integer, Pattern>, String>() {
            private static final long serialVersionUID = 1L;

            @Override
            public void flatMap(Tuple2<Integer, Pattern> value, Collector<String> out) throws Exception {

                if (value.f1.thirdAction != null) {
                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction + "," + value.f1.thirdAction);
                } else {

                    out.collect("User ID: " + value.f0 + ",Pattern matched:" + value.f1.firstAction + ","
                            + value.f1.secondAction);

                }

            }

        }).print();

        env.execute("CEPBroadcast");

    }

}

如果我给出一个模式来评估它给我的输出,如下所示

DataStream<Action> actions = env.fromElements(new Action(1001, "login"), new Action(1002, "login"),
                new Action(1003, "login"), new Action(1003, "addtocart"), new Action(1001, "logout"),
                new Action(1003, "logout"));

DataStream<Pattern> pattern = env.fromElements(new Pattern("login", "logout"));

Output: User ID: 1001,Pattern matched:login,logout

如果我试图给出多个模式来评估,如下所示,它不评估第二个模式,建议我如何评估多个模式,提前致谢。

DataStream<Pattern> pattern = env.fromElements(new Pattern ("login","addtocart","logout"),
                new Pattern("login", "logout"));

Output:  User ID: 1003,Pattern matched:login,addtocart,logout

最佳答案

这不起作用有几个原因:

(1) 只要您有一个具有多个输入流的 Flink 运算符(例如应用程序中的 PatternEvaluator),您就无法控制该运算符如何从其输入中读取数据。在您的情况下,它可能会在读取模式之前完全消耗来自操作流的事件,反之亦然,或者它可能会交错两个流。从某种意义上说,你很幸运它与任何东西都匹配。

解决这个问题并不容易。如果您在编译时知道所有模式(换句话说,如果它们实际上不是动态的),那么您可以使用 Flink CEP 或 Flink SQL 中的 MATCH_RECOGNIZE。

如果您确实需要动态模式,那么您必须找到一种方法来阻止操作流,直到读取模式为止。这个主题(“侧面输入”)之前已经在关于 SO 的其他问题中讨论过。例如,参见How to unit test BroadcastProcessFunction in flink when processElement depends on broadcasted data 。 (或者您可以调整您的期望,并确保只有在存储模式后处理的操作才能与该模式进行匹配。)

(2) 在存储模式时使用 null 作为键

bcState.put(null, pattern);

当第二个模式到达时,您将用第二个模式覆盖第一个模式。永远不会出现两种模式都可用于匹配的情况。

要将输入与两种不同的模式进行匹配,您需要修改 PatternEvaluator 以处理两种模式的同时匹配。这需要将这两种模式存储在广播状态中,将它们都考虑到 processElement 中,并为这两种模式提供 prevActionState 实例。您可能想要提供模式 ID,使用这些 ID 作为广播状态中的键,并使用 MapState 作为 prevActionState,再次由模式 ID 作为键。

更新:

请记住,当您使用 DataStream API 编写流作业时,您并没有像在典型的过程应用程序中那样定义执行顺序。相反,您描述的是数据流图的拓扑,以及嵌入该图中的将执行作业(将并行执行)的运算符的行为。

关于java - 使用 Apache Flink 进行动态模式评估,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60599336/

相关文章:

apache-flink - 是否可以在 KeyedStream (Apache Flink) 中为每个键生成水印?

apache-flink - Apache 弗林克 : how to use SourceFunction to execute a task at specified interval?

java - Java 枚举中的实例

Java Selenium 错误 "java.lang.IllegalArgumentException: Argument is of an illegal type: com.sun.proxy.$Proxy20"

java - 如何更改网格中 "cell"的颜色

apache-flink - 在 Flink Stateful 功能模块中存储数据库或第三方连接

mysql - 在 Apache Flink 的 Table API 中使用 CASE WHEN 进行查询

java - 访问selenium中结构dom(*ngFor)元素的innerHTML或innerText

apache-flink - 从 ListState 中删除选择项

java - Flink 数据集 API : Is GroupBy is not working correctly?