apache-kafka - Kafka Streams - 减少大型状态存储的内存占用

标签 apache-kafka apache-kafka-streams

我有一个拓扑结构(见下文)可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用率非常高,我正在寻找一些关于如何减少状态存储占用空间的建议(下面有更多详细信息)。 注意:我并不是想逃避 state stores,我只是认为我可能有一种方法可以改进我的拓扑 - 见下文。

// stream receives 1 billion+ messages per day
stream
    .flatMap((key, msg) -> rekeyMessages(msg))
    .groupBy((key, value) -> key)
    .reduce(new MyReducer(), MY_REDUCED_STORE)
    .toStream()
    .to(OUTPUT_TOPIC);

// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);


// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)

// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)

// etc

更具体地说,我想知道将 OUTPUT_TOPIC 作为 KTable 进行流式传输是否会导致状态存储 (REKEYED_STORE) 大于本地所需的大小。对于具有大量唯一键的变更日志主题,将它们作为 KStream 进行流式处理并进行窗口聚合会更好吗?或者这不会像我认为的那样减少足迹(例如,只有一部分记录——窗口中的记录,将存在于本地状态存储中)。

无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:

  • 对于具有这种吞吐量水平的 Kafka Streams 应用,是否应该考虑任何配置选项、一般策略等?
  • 对于单个实例的内存密集程度是否有任何指导方针?即使您的指导方针有些武断,与他人分享也可能会有所帮助。我的一个实例目前使用 15GB 内存 - 我不知道这是好是坏/无关紧要。

如有任何帮助,我们将不胜感激!

最佳答案

用你现在的模式

stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)

您会得到两个内容相同的商店。一个用于 reduce() 运算符,一个用于读取 table() —— 虽然这可以减少到一个存储:

KTable rekeyedTable  = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely

这应该会显着减少您的内存使用量。

关于开窗与非开窗:

  1. 这是您所需语义的问题;如此简单地从非窗口化切换到窗口化 reduce 似乎是有问题的。

  2. 即使您也可以使用窗口语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即键 + currentAgg)。因此,对于单个键,两种情况的存储要求是相同的(单个窗口具有相同的存储要求)。同时,如果您使用 windows,您实际上可能需要更多内存,因为您获得了一个聚合专业 key pro 窗口(而在非窗口情况下您只获得了一个聚合专业 key )。您可能会节省内存的唯一场景是您的“ key 空间”在很长一段时间内分散的情况。例如,您可能很长时间都得不到某些键的任何输入记录。在非窗口情况下,这些记录的聚合将一直存储,而对于窗口情况,key/agg 记录将被删除,如果以后出现具有此键的记录,将重新创建新条目再次打开(但请记住,在这种情况下你丢失了之前的聚合门——参见(1))

最后但并非最不重要的一点是,您可能想查看应用程序大小调整指南:http://docs.confluent.io/current/streams/sizing.html

关于apache-kafka - Kafka Streams - 减少大型状态存储的内存占用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44770462/

相关文章:

apache-kafka - 使用单个 kafka 流从多个主题消费

java - 使用 Java API 定期获取 Kafka Producer 指标

scala - 如何向 Kafka Streams 中的流添加冷却/速率限制?

java - KStream mapValues 和 transformValues 之间的区别

java - 如何提取 Kafka Streams 消息中嵌入的时间戳

apache-kafka - 卡夫卡流异常 : GroupAuthorizationException

java - 以编程方式(JAVA)从 Kafka 主题创建/删除分区

java - JSON对象: How to map的Kafka流消费者

apache-kafka - Ktable Ktable 连接示例

docker - 我们可以使用在生产中使用docker化的Apache Kafka吗?