java - 时间窗口聚合中的 Kafka 流问题

标签 java apache-kafka spring-cloud apache-kafka-streams spring-cloud-stream

我对 KStreams 聚合和窗口有疑问。我想将一条记录聚合到一个记录列表中,只要它落在一个时间窗口内,这些记录就具有相同的键。 我选择了 SessionWindows,因为我必须在 session 中使用移动窗口:假设记录 A 在 10:00:00 到达;然后所有其他具有相同 key 的记录到达 在 10 秒窗口时间内(直到 10:00:10)将落入同一 session ,请记住,如果它在 10:00:03 到达,则窗口将移动到 10:00:13(+10 秒)。

这导致我们有一个从给定键收到的最后一条记录起 +10 秒的移动窗口。

现在的问题是:我想获得最后的聚合结果。我使用 .suppress() 表示我不想要任何中间结果,我只想要窗口关闭时的最后一个结果。这 无法正常工作,因为虽然它不发送任何中间聚合结果,但当时间窗口结束时,我没有得到任何结果。我注意到为了接收它我需要发布另一个 消息进入主题,这在我的情况下是不可能的。

阅读 .suppress() 我得出的结论是它可能不是实现我想要的方法,这就是为什么我的问题是:如何强制关闭窗口并发送最新的聚合计算结果?

@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES) 
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) { 
    input.map(this::getRecord)
            .groupBy(keyOfElement)
            .windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
            .aggregate(...do stuff...)
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map(this::createAggregatedResult);
}

最佳答案

简而言之,发生这种情况的原因是因为在 KStreams 和大多数其他计算聚合的流处理引擎中,时间基于事件时间工作。

https://kafka.apache.org/0101/documentation/streams#streams_time

换句话说,在新消息到达超出您的时间窗口 + 宽限时间(导致消息迟到)之前,窗口无法关闭。

此外,根据我最近编写的一些单元测试,我倾向于认为第二条消息需要与前一条消息位于同一分区中,以便事件时间向前推进。实际上,当您在生产环境中运行并且大概每秒处理数百条消息时,这变得不明显。

我还要补充一点,您可以实现自定义时间戳提取器,它允许您根据特定消息到达的时间窗口进行细粒度控制。

how can I force the window to close and send the latest aggregated calculated result?

为了最终回答您的问题,在不向源主题发出额外消息的情况下强制关闭时间窗口是不可能的。

关于java - 时间窗口聚合中的 Kafka 流问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56364606/

相关文章:

java - 位移位操作 java - 输出到 8 位而不是 9 位

java - 重写 XML 文件时使用哪种 Java XML 解析方法?

apache-kafka - 为什么在 Kafka REST 代理中进行 Base64 编码/解码?

java - 功能区负载平衡器看不到服务器列表

java - 如何将新文件上传到GCP存储桶?

java - 登录playframework后的页面

java - JAX-WS:Soap-Client - 为什么需要 WSDL 来初始化 stub

docker - Confluent 控制中心使用 "exit 1"定期关闭

java - Spring Kafka 自定义反序列化器

java - Spring Boot、Spring Cloud AWS 和 AWS SQS 不从队列中读取