java - Kafka 最近 X 分钟的实时平均值

标签 java apache-kafka apache-kafka-streams

我有一个关于单个经纪人的主题,其中不断传入有关用户点击的数据。我希望能够近乎实时地(例如 1 秒)计算每个用户在过去 X 分钟内的平均点击次数。

我尝试使用 Kafka 流执行此操作,但问题是缩略图窗口无法近乎实时地计算并每秒更新最后 X 分钟内所有值的平均值。跳跃窗口也许适合,但对于 hop=1 秒和大小为 5 分钟的窗口,它将创建 300 个窗口,我想从性能方面来看这太多了。

有没有一种方法可以在没有像 Spark 这样的第三方流引擎的情况下,但使用“普通”Kafka API 来做到这一点? (Kafka 流是可选的)。

非常感谢!

最佳答案

正如评论者所说 - 使用 Kafka Streams ,或KSQL 。 KSQL 运行在 Kafka Streams 之上,因此数据建模和概念(例如窗口和聚合)是相同的。

在 KSQL 中:

ksql> CREATE TABLE USER_CLICKS_PER_MINUTE AS \
SELECT USER_ID, COUNT(*) AS CLICK_COUNT, \
COUNT(*)/5 AS CLICKS_PER_MINUTE \
FROM RATINGS WINDOW HOPPING (SIZE 5 MINUTES, ADVANCE BY 1 SECOND) \
GROUP BY USER_ID;

 Message
---------------------------
 Table created and running
---------------------------

查询有状态聚合:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss'), USER_ID, \
CLICK_COUNT, CLICKS_PER_MINUTE \
FROM USER_CLICKS_PER_MINUTE \
WHERE USER_ID=18;
2018-05-09 11:44:33 | 18 | 7 | 1
2018-05-09 11:44:34 | 18 | 7 | 1
2018-05-09 11:44:35 | 18 | 7 | 1
2018-05-09 11:44:36 | 18 | 9 | 1
2018-05-09 11:44:37 | 18 | 9 | 1
2018-05-09 11:44:38 | 18 | 10 | 2
2018-05-09 11:44:39 | 18 | 10 | 2
2018-05-09 11:44:40 | 18 | 10 | 2
2018-05-09 11:44:41 | 18 | 12 | 2
2018-05-09 11:44:42 | 18 | 12 | 2
2018-05-09 11:44:43 | 18 | 12 | 2
2018-05-09 11:44:44 | 18 | 12 | 2
2018-05-09 11:44:45 | 18 | 12 | 2
2018-05-09 11:44:46 | 18 | 12 | 2
2018-05-09 11:44:47 | 18 | 12 | 2
2018-05-09 11:44:48 | 18 | 12 | 2
2018-05-09 11:44:49 | 18 | 12 | 2
2018-05-09 11:44:50 | 18 | 12 | 2
2018-05-09 11:44:51 | 18 | 13 | 2
2018-05-09 11:44:52 | 18 | 13 | 2
2018-05-09 11:44:53 | 18 | 13 | 2
2018-05-09 11:44:54 | 18 | 13 | 2
2018-05-09 11:44:55 | 18 | 13 | 2
2018-05-09 11:44:56 | 18 | 13 | 2
2018-05-09 11:44:57 | 18 | 13 | 2
2018-05-09 11:44:58 | 18 | 13 | 2
2018-05-09 11:44:59 | 18 | 13 | 2
2018-05-09 11:45:00 | 18 | 13 | 2
2018-05-09 11:45:01 | 18 | 13 | 2

请记住,当新事件到达时,Kafka Streams 和 KSQL(基于 Kafka Streams 构建)将在给定时间窗口内重新发出聚合。根据您的要求,每 1 秒前进的实际跳跃窗口可能不是您想要的。实时更新的简单滚动窗口仍将为您提供有状态聚合的实时结果。

有关 KSQL 的更多信息:

关于java - Kafka 最近 X 分钟的实时平均值,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50280935/

相关文章:

javax.smartcardio 案例 4 APDU 消失 - 6700 响应 - 警告

java - 使用参数创建新线程 - 线程已创建但不显示数据

apache-kafka - 当代理检测到配额违规(Kafka 配额延迟)时如何 react ?

apache-kafka - Apache Kafka 生产者配置错误

java - Kafka为什么要更改商店名称

java - 如果列表中的最后一个元素比其他元素大,如何删除它?

java - 在 Spring-Boot-Web/Tomcat 中有条件地禁用 JSESSIONID

java - Kafka 的流 API 可以帮助分发数百个分页请求吗?

java - 了解Kafka流groupBy和window

apache-kafka - kafka 本地状态存储/变更日志中的保留时间