public class UserEvent {
int userId;
long loginTime;
long jobId;
long jobAttachTime;
long jobdetachTime;
long workTime;
long logoutTime;
long activeTime;
EventType eventType;
}
我有一个应用程序,其中基于用户操作(例如登录、作业附加、作业分离和注销)在 Kafka 主题上发送事件。
每个事件在 UserEvent
对象中都有一些信息以及 userId
和 eventType
,
例如登录事件有loginTime
;
作业附加事件具有属性jobId
、jobAttachTime
。
类似地,Logout 事件具有属性logoutTime
。
我的要求是在收到每个用户的 Logout 事件后将所有这些事件的信息聚合到一个对象中。
这样,在 logout 事件发生后,UserEvent 对象将具有 loginTime
、logoutTime
、计算出的 workTime
、activeTime
等。
如何使用Kafka KStreams 和/或 KTables 实现这一点?
最佳答案
为了聚合用户事件,您需要一个键(例如: session ID)来过滤所有事件中常见的 session 。假设您将 sessionID 附加到每个用户事件,该 ID 对于每个 session 都是唯一的,但对于该 session 期间发生的所有用户事件都是相同的。
可以通过以下方式使用 GroupBy().aggregate()
来实现:(考虑到您具有 session ID 等效属性,该属性可以唯一用作 key )
// Let's say there is a sessionID
KTable<String, UserEvent> userEventSummary = userEvents
.groupBy(event -> event.get("sessionId"))
.aggregate((userEventSummary,userEvent)->{
userEventSummary = userEvent;
if(new!= null){
String loginEvent = new.get("eventType").get("eventName");
if(loginEvent.equals("login")){
userEventSummary.setLoginTime(new.getLoginTime());
}
if(loginEvent.equals("logOut")){
long workTime = Math.abs(userEvent.getLogOutTime()-userEventSummary.getLoginTime());
userEventSummary.setWorkTime(workTime);
userEventSummary.setActiveTime(workTime);
}
}
return userEventSummary;
});
// if default value for logoutTime is 0, filter the user events which don't have logout time yet
KTable<String, UserEvent> loggedOutEventSummary = userEventSummary.filter(event-> event.getLogOutTime()!= 0);
它将返回由具有注销事件的用户事件过滤的每个用户操作的聚合状态。
关于java - 使用 Kafka Streaming 聚合事件数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57361030/