java - 使用 Kafka Streaming 聚合事件数据

标签 java apache-kafka apache-kafka-streams

public class UserEvent {
    int userId;
    long loginTime;
    long jobId;
    long jobAttachTime;
    long jobdetachTime;
    long workTime;
    long logoutTime;
    long activeTime;
    EventType eventType;
}

我有一个应用程序,其中基于用户操作(例如登录、作业附加、作业分离和注销)在 Kafka 主题上发送事件。 每个事件在 UserEvent 对象中都有一些信息以及 userIdeventType, 例如登录事件有loginTime作业附加事件具有属性jobIdjobAttachTime。 类似地,Logout 事件具有属性logoutTime。 我的要求是在收到每个用户的 Logout 事件后将所有这些事件的信息聚合到一个对象中。 这样,在 logout 事件发生后,UserEvent 对象将具有 loginTimelogoutTime、计算出的 workTimeactiveTime 等。 如何使用Kafka KStreams 和/或 KTables 实现这一点?

最佳答案

为了聚合用户事件,您需要一个键(例如: session ID)来过滤所有事件中常见的 session 。假设您将 sessionID 附加到每个用户事件,该 ID 对于每个 session 都是唯一的,但对于该 session 期间发生的所有用户事件都是相同的。

可以通过以下方式使用 GroupBy().aggregate() 来实现:(考虑到您具有 session ID 等效属性,该属性可以唯一用作 key )

    // Let's say there is a  sessionID
KTable<String, UserEvent> userEventSummary = userEvents
                                  .groupBy(event -> event.get("sessionId"))
                                  .aggregate((userEventSummary,userEvent)->{
                                        userEventSummary = userEvent;
                                        if(new!= null){
                                            String loginEvent = new.get("eventType").get("eventName");
                                            if(loginEvent.equals("login")){
                                                userEventSummary.setLoginTime(new.getLoginTime());
                                            }
                                            if(loginEvent.equals("logOut")){
                                                long workTime = Math.abs(userEvent.getLogOutTime()-userEventSummary.getLoginTime());
                                                userEventSummary.setWorkTime(workTime);
                                                userEventSummary.setActiveTime(workTime);
                                            }
                                        }
                                        return userEventSummary;
                                   });

// if default value for logoutTime is 0, filter the user events which don't have logout time yet
KTable<String, UserEvent>  loggedOutEventSummary = userEventSummary.filter(event-> event.getLogOutTime()!= 0);

它将返回由具有注销事件的用户事件过滤的每个用户操作的聚合状态。

关于java - 使用 Kafka Streaming 聚合事件数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57361030/

相关文章:

apache-kafka - 如何从 KTable 中获取排序后的 KeyValueStore?

apache-kafka-streams - 如何在固定大小的基于计数的滑动窗口上进行聚合?

java - 在Java中遍历字符串字符的最简单/最好/最正确的方法是什么?

java - 如何实现: interface MySortedCollection<T extends Comparable<T>>

java - KafkaConsumer.commitSync() 实际提交了什么?

java - Quarkus + Kafka + Smallrye 异常处理

ssl - 使用 ssl+acl 配置的 Console Producer 中的 Leader 不可用 Kafka

java - 如何在 Kafka Streams 应用程序中处理偏移提交期间的超时异常

java - 调用 stub 而不是原始方法

java - 删除字符串中的反斜杠