apache-kafka-streams - 如何在固定大小的基于计数的滑动窗口上进行聚合?

标签 apache-kafka-streams

如何使用固定大小的基于计数的窗口实现滑动窗口聚合(或转换)?

例如:如果我有如下流数据

input stream = 1,2,3,4,5,6,7,8...

假设时间在这里无关紧要。并说我的聚合函数是 AVERAGE 并且窗口大小固定为 3 条记录(不是 3 毫秒、3 秒、3 小时等),我希望我的输出流为
output stream = avg(1,2,3), avg(2,3,4), avg(3,4,5), avg(4,5,6), avg(5,6,7)... = 2,3,4,5,6...

Kafka 流工作中记录的 Windows 是“基于时间的”。甚至基类 Window 的构造函数也具有以下签名:
Window(long startMs, long endMs)

所以我不确定它是否是正确的工具 非基于时间 窗口聚合。

Apache Flink 支持 count-based sliding and tumbling windows .这正是我所需要的,但我正在 Kafka Streams 中寻找类似的功能。

最佳答案

如果您不关心时间排序,您可以实现自定义 Transformer与附加状态。

StreamsBuilder builder = new StreamsBuilder();
builder.addStoreStore(...); // add KeyValueStore here
KStream result = builder.stream("topic").transform(...); // pass in name of your KeyValueStore, too

为您定制 Transformer您可以维护一个 List每个键,列表是你的窗口——只要列表小于你的窗口大小,你就将新记录附加到列表中——如果它正好是大小,你就会触发计算——如果它超过了大小,你修剪它并在之后触发计算。

有关更多详细信息,请参阅文档:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html (请注意,ProcessorTransformer 基本上是一回事。)

关于apache-kafka-streams - 如何在固定大小的基于计数的滑动窗口上进行聚合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49087533/

相关文章:

apache-kafka - 默认情况下,max.poll.intervals.ms设置为int.Max

java - 如何使用 Quarkus 按拓扑启动 Kafka-Streams 管道

apache-kafka - 将消息从一个 Kafka 集群传输到另一个集群

apache-kafka-streams - Kafka Streams 使用最大计数的定时窗口方法

java - 并行 KafkaStream 处理的更好方法?

java - 为什么抑制功能不适用于跳跃窗口?

apache-kafka-streams - Kafka Ktable 查询

apache-kafka - 启动大量 kafka 流会严重加载列表中的第一个 kafka 代理

apache-kafka - 与 KafkaStreams 的窗口结束外连接

apache-kafka - ProducerFencedException 处理 Kafka Stream