java - 如何从kafka流中获取窗口聚合?

标签 java apache-kafka apache-kafka-streams

我有一个事件流,我想根据时间窗口聚合这些事件。我的解决方案提供增量聚合,而不是在定时窗口上提供聚合。我读到这对于流来说是正常的,因为它会将结果作为更改日志给出。同样在研究过程中,我遇到了 2 step windowed aggregation with Kafka Streams DSLHow to send final kafka-streams aggregation result of a time windowed KTable? .但是第一篇文章中的解决方案有些过时(使用不推荐使用的 API)。我使用了那些已弃用的 API 中建议的新 API。这是我的解决方案,

KStream<String, Event> eventKStream = summarizableData.mapValues(v -> v.getEvent());
    KGroupedStream<String, Event> kGroupedStream = eventKStream.groupBy((key, value) -> {
             String groupBy = getGroupBy(value, criteria);
             return groupBy;
    }, Serialized.with(Serdes.String(), eventSerde));


    long windowSizeMs = TimeUnit.SECONDS.toMillis(applicationProperties.getWindowSizeInSeconds());
    final TimeWindowedKStream<String, Event> groupedByKeyForWindow = kGroupedStream
            .windowedBy(TimeWindows.of(windowSizeMs)
                    .advanceBy(windowSizeMs));

但是正如我之前解释的那样,我的结果不是在特定时间窗口内给出的,而是作为增量聚合给出的。我需要我的数据在 windowSize 中指定的时间输出。我还读到 CACHE_MAX_BYTES_BUFFERING_CONFIG 可以控制输出,但我需要适用于每种情况的可靠解决方案。另请注意 https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 中给出的模式wiki 现在已经过时了,因为它使用旧的 API。 (我使用的是kafka-streams 1.1.0版本)

最佳答案

问题是我的错误。以上,代码示例工作正常。但最后我将 KTable 转换为 KStream。这就是问题所在。转换为 KStream 也会导致输出中间结果。 https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows 中给出的模式工作良好。通过有问题的代码,

// Aggregation

KTable<Windowed<String>, Event> results = groupedByKeyForWindow.aggregate(new AggregateInitiator(), new EventAggregator());

// This converstion causing changelog to output. Instead use next line.
KStream<String, AggregationMessage> aggregationMessageKStream = results.toStream((key, value) -> key.toString())
                .mapValues(this::convertToAggregationMessage).filter((k, v) -> v != null);

// output KTable to sample topic. But this output controlled by 
// COMMIT_INTERVAL_MS_CONFIG and CACHE_MAX_BYTES_BUFFERING_CONFIG parameters. 
// I'm using default values for these params.
results.to(windowedSerde, eventSerde,  "Sample");

关于java - 如何从kafka流中获取窗口聚合?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50904541/

相关文章:

apache-kafka - Prometheus 如何抓取 Kafka 主题?

java - Kafka 流连接中的 RecordTooLargeException

java - 如何在 Jfreechart 中进行鼠标跟踪?

java - 使用 Java 8 从(嵌套列表)列表列表中获取最大和最小总和的列表

apache-kafka - Apache Kafka消费者组和简单消费者

apache-kafka - ClickHouse Kafka 性能

java - 向Word文档添加脚注

Java 到 MySQL : How to update row with a value from variable?

apache-kafka - enable.auto.commit 值未配置为 true

java - 卡夫卡流 : Processor sometimes processes same messages upon application restart