我有一个拓扑结构(见下文)可以读取一个非常大的主题(每天超过十亿条消息)。这个 Kafka Streams 应用程序的内存使用率非常高,我正在寻找一些关于如何减少状态存储占用空间的建议(下面有更多详细信息)。 注意:我并不是想逃避 state stores,我只是认为我可能有一种方法可以改进我的拓扑 - 见下文。
// stream receives 1 billion+ messages per day
stream
.flatMap((key, msg) -> rekeyMessages(msg))
.groupBy((key, value) -> key)
.reduce(new MyReducer(), MY_REDUCED_STORE)
.toStream()
.to(OUTPUT_TOPIC);
// stream the compacted topic as a KTable
KTable<String, String> rekeyedTable = builder.table(OUTPUT_TOPIC, REKEYED_STORE);
// aggregation 1
rekeyedTable.groupBy(...).aggregate(...)
// aggreation 2
rekeyedTable.groupBy(...).aggregate(...)
// etc
更具体地说,我想知道将 OUTPUT_TOPIC
作为 KTable 进行流式传输是否会导致状态存储 (REKEYED_STORE
) 大于本地所需的大小。对于具有大量唯一键的变更日志主题,将它们作为 KStream
进行流式处理并进行窗口聚合会更好吗?或者这不会像我认为的那样减少足迹(例如,只有一部分记录——窗口中的记录,将存在于本地状态存储中)。
无论如何,我总是可以启动这个应用程序的更多实例,但我想让每个实例尽可能高效。这是我的问题:
- 对于具有这种吞吐量水平的 Kafka Streams 应用,是否应该考虑任何配置选项、一般策略等?
- 对于单个实例的内存密集程度是否有任何指导方针?即使您的指导方针有些武断,与他人分享也可能会有所帮助。我的一个实例目前使用 15GB 内存 - 我不知道这是好是坏/无关紧要。
如有任何帮助,我们将不胜感激!
最佳答案
用你现在的模式
stream.....reduce().toStream().to(OUTPUT_TOPIC);
builder.table(OUTPUT_TOPIC, REKEYED_STORE)
您会得到两个内容相同的商店。一个用于 reduce()
运算符,一个用于读取 table()
—— 虽然这可以减少到一个存储:
KTable rekeyedTable = stream.....reduce(.);
rekeyedTable.toStream().to(OUTPUT_TOPIC); // in case you need this output topic; otherwise you can also omit it completely
这应该会显着减少您的内存使用量。
关于开窗与非开窗:
这是您所需语义的问题;如此简单地从非窗口化切换到窗口化 reduce 似乎是有问题的。
即使您也可以使用窗口语义,也不一定会减少内存。请注意,在聚合情况下,Streams 不存储原始记录,而仅存储当前聚合结果(即键 + currentAgg)。因此,对于单个键,两种情况的存储要求是相同的(单个窗口具有相同的存储要求)。同时,如果您使用 windows,您实际上可能需要更多内存,因为您获得了一个聚合专业 key pro 窗口(而在非窗口情况下您只获得了一个聚合专业 key )。您可能会节省内存的唯一场景是您的“ key 空间”在很长一段时间内分散的情况。例如,您可能很长时间都得不到某些键的任何输入记录。在非窗口情况下,这些记录的聚合将一直存储,而对于窗口情况,key/agg 记录将被删除,如果以后出现具有此键的记录,将重新创建新条目再次打开(但请记住,在这种情况下你丢失了之前的聚合门——参见(1))
最后但并非最不重要的一点是,您可能想查看应用程序大小调整指南:http://docs.confluent.io/current/streams/sizing.html
关于apache-kafka - Kafka Streams - 减少大型状态存储的内存占用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44770462/