java - 卡夫卡流DSL : Application lags for windowed aggregation

标签 java apache-kafka apache-kafka-streams

我们有以下用例:

从主题读取(预期吞吐量是一个键每 2 秒记录一次),groupByKey 并执行 30 分钟窗口的窗口聚合,跳跃周期为 1 分钟。 聚合只是附加收到的记录。

当应用程序启动时,一切工作正常,但在后期阶段,当聚合大小增加时,应用程序会变慢并滞后

拓扑:

KStream<String, Foo> numericStream = builder.stream("topic", Consumed.with(Serdes.String(), FooSerde));
static Duration WINDOW_MS = Duration.ofMinutes(30);
static Duration ADVANCE_MS = Duration.ofMinutes(15);


KStream<Windowed<String>, Foo1> windowedStream = numericStream.peek((key, value) -> System.out.println(value.getDateTime()))
        .groupByKey()
        .windowedBy((TimeWindows.of(WINDOW_MS).advanceBy(ADVANCE_MS)).grace(Duration.ofMillis(30)))
        .aggregate(new Initializer<Foo1>() {
            @Override
            public  Foo1 apply() {
                return new Foo1();
            }},
                   (key, value, aggregate) -> {
                       aggregate.append(value);
                       return aggregate;
                   },
                   Materialized.<String, Foo1, WindowStore<Bytes,byte[]>>as("some_name").withValueSerde(Foo1Serde))
        .toStream()
        .peek((key, value) -> System.out.println(" Key: "+key+ " Start: "+getISTTime(key.window().start()) + " End: "+ getISTTime(key.window().end()) +" Count: " + value.getCount() ));

每条记录的大小约为20KB。当聚合大小超过 10MB 左右时,记录的处理时间会超过 2 秒,因此会出现滞后。

COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为状态存储应始终与最新数据包保持同步,并且状态存储会被查询且间隔不同。

  1. 如何消除应用程序的延迟,是否与 RocksDB I/O 操作有关?因为计数操作而不是聚合操作没有任何延迟

  2. 每个主题有 3 个分区,但是具有相同键的记录会转到同一分区,因此线程/多个实例会有帮助吗?

  3. 我们也在考虑在不使用窗口的情况下执行此操作,窗口是否会为较大的聚合造成这种滞后?

最佳答案

  1. 由于您向 RocksDB 写入和读取越来越大的数据,因此可能会减慢处理速度。

  2. 是的,在一个实例中使用三个线程或启动三个实例各一个线程也可能在这种情况下有所帮助。通过您的拓扑和三个分区,处理分布在三个任务上。如果只有一个实例和一个线程,则所有三个任务将由同一线程运行。您可以通过指定一个具有三个线程的实例来进行纵向扩展,也可以通过在不同的计算节点上启动三个实例(每个实例具有一个线程)来进行横向扩展。两个实例之间的设置(一个具有两个线程,另一个具有一个线程)也可以工作。

  3. 如果没有窗口,聚合将永远不会过期,也永远不会从状态存储中删除。因此,状态存储中的数据将无限增长,并且可能会减慢状态存储的速度。

如果您使用交互式查询来查询状态存储,则无需将 COMMIT_INTERVAL_MS_CONFIG 设置为 0,因为交互式查询还会查询状态存储前面的缓存。实际上,将 COMMIT_INTERVAL_MS_CONFIG 设置为零也可能会减慢处理速度,因为它会增加磁盘 I/O,因为您不断地将数据写入磁盘。

关于java - 卡夫卡流DSL : Application lags for windowed aggregation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57997896/

相关文章:

java - 缓存来自 RMI 的 POJO

java - ActionMessages 如何支持 struts 2 中的国际化?

scala - 以编程方式停止 Alpakka Kafka 流的正确方法

apache-kafka - Flink+Kafka重置检查点和偏移量

docker - Kafka stream.allMetadata()在DOCKER中返回空列表(交互式查询)

java - 反序列化 Avro 序列化 Kafka Stream 的问题

java - Kafka Stream Punctuator 在重建本地存储数据时访问它

java - 执行 SQL 查询后更改摘要

apache-kafka - kafka 消费者 0.9 是否向后兼容?

java - 这里使用 "throws"子句意味着什么,如果不使用怎么办?