apache-kafka-streams - KStreamWindowAggregate 似乎共享流时间导致窗口过期

标签 apache-kafka-streams

由于窗口过期而丢弃的消息,即使对于该特定键不应关闭窗口

我想对从单个分区主题消耗的消息进行分组,并根据事件时间将这些消息窗口化 30 秒。为了避免立即处理,我调用了抑制方法并使用了 .grace 方法。一旦窗口关闭(30 秒后 + 0 宽限期),我希望将最终结果添加到主题中。我从该主题消费的消息有两个不同的键:300483976 和 300485339。我消费的消息将 eventtime 增加了 10 秒。我读到流时间只会根据增加事件时间的新消息增加。这也是我的经历。但是,我看到的问题如下:

我消耗了 key 300483976 的前 10 条消息。基于方法“KStreamWindowAggregate.process”,我注意到 internalProcessorContext.streamTime() 每次都增加,基于最新消耗的消息。处理完 10 条消息后,最终的 eventtime 现在是 starttime + 300 秒。在那之后, key 300485339 的消息被消耗。所有,但最新的消息被标记为过期并被丢弃,并带有消息“跳过过期窗口的记录。”。似乎 internalProcessorContext.streamTime() 仍然记得第一次运行的最新值,因此丢弃了键为 300485339 的消息。

stream
                .groupByKey(Grouped.with(Serdes.String(), new DataSerde()))
                .windowedBy(
                        TimeWindows.of(Duration.ofSeconds(30))
                                .grace(Duration.ofMillis(0))) // override the default of 24 hours
                .aggregate(Data::new, transform(), materialize())
                .filter((key, value) -> {
                    log.info("agg {} {}", key, value.toString());
                    return true;
                })
                .suppress(
                        Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                .toStream();

我希望当消息按键(300483976 和 300485339)分组时,流时间不会“共享”。我希望 key 300483976 和 key 300485339 会有单独的窗口。知道出了什么问题吗?我正在使用 kafka-streams 2.1.0 和一个时间戳提取器,它从消息中的一个字段中获取事件时间。

更新

我做了一些额外的测试并改编了一个不使用聚合的例子,但确实显示了与流时间相同的问题:
    @Test
    public void shouldSupportFinalResultsForTimeWindows() {
        final StreamsBuilder builder = new StreamsBuilder();
        final KTable<Windowed<String>, Long> valueCounts = builder
                .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
                .groupBy((String k, String v) -> k, Grouped.with(STRING_SERDE, STRING_SERDE))
                .windowedBy(TimeWindows.of(ofMillis(2L)).grace(ofMillis(1L)))
                .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled());
        valueCounts
                .suppress(untilWindowCloses(unbounded()))
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long()));
        valueCounts
                .toStream()
                .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v))
                .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
        final Topology topology = builder.build();
        System.out.println(topology.describe());
        final ConsumerRecordFactory<String, String> recordFactory =
                new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
        try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) {
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k2", "v1", 7L));
            // note this last records sets the streamtime to 7L causing the next messages to be discarded
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
        }
    }

在上面的示例中,第二条消息将流时间设置为 7L,即使消息具有不同的键,也会关闭创建的 0 到 2 窗口。这也会导致下几条消息被丢弃,即使 key 是 k1。因此,从这个示例中可以清楚地看出,没有考虑 key 。如果这实际上是它的设计方式,我想知道这个场景是什么。特别是当我认为一个主题具有不同分区的消息并且一个分区可能具有与其他分区的流时间(源自事件时间)完全不同的消息时,这是很常见的。希望你能对此有所了解??

最佳答案

观察到的行为是设计使然。显然,流时间在所有消息中被跟踪(它不是子流时间)。

您看到的“问题”是,您的输入数据是乱序的(只是输入 key 和 ts):

(k1, 1), (k1, 2), (k1, 3), (k2, 1), (k2, 2), (k3, 3)

时间不是单调递增的,即key为k2的记录键为 k1 的记录乱序.因为您将宽限期设置为零,所以您告诉 Kafka Streams 不允许无序数据(或实际上只有窗口中的一些无序数据)。因此,对于具有交错键但单调递增时间戳的有序数据流,结果只会如您所料):
(k1, 1), (k2, 1), (k1, 2), (k2, 2), (k1, 3), (k3, 3)

如果您有乱序数据,您应该相应地设置高宽限期(零仅适用于有序数据流)。

关于apache-kafka-streams - KStreamWindowAggregate 似乎共享流时间导致窗口过期,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56773310/

相关文章:

apache-kafka - kafka流库是否依赖于底层kafka代理?

java - kafka 流不与动态生成的类一起运行

java - Kafka Streams 本地状态存储

apache-kafka - 为什么Kaka Streams repartition topic的retention.ms默认设置为-1?这不是无限保留重新分区主题中的消息吗?

java - KStream-KStream inner join 抛出java.lang.ClassCastException

java - AVRO 原始类型的 Serde 类

apache-kafka-streams - Tombstone 消息没有从 KTable 状态存储中删除记录?

apache-kafka - 在Processor Api中,当 `DefaultStreamPartitioner`函数中未指定分区器时,是否会应用 `addSink`?

apache-kafka - Kafka Streams - 是否有可能减少由多个聚合创建的内部主题的数量

apache-kafka - kafka是否具有规则引擎的能力?