我有一个紧凑的主题,大约有 30 个 Mio 键。
我的 App
将此主题具体化为 KeyValueStore
.
我如何检查 KeyValueStore
是完全人口?如果我通过 InteractiveQuery
查找 key 我需要知道 key 是否不存在,因为 StateStore
尚未准备好,或者 key 确实不存在。
我以这种方式实现 StateStore:
@Bean
public Consumer<KTable<Key, Value>> process() {
return stream -> stream.filter((k, v) -> v != null,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
.withKeySerde(new KeySerde())
.withValueSerde(new ValueSerde()));
}
最佳答案
通常,没有“完全加载”这样的东西,因为在应用程序启动后的任何时间点,新数据可能会写入输入主题,并且会读取这些新数据以更新相应的表。
您可以做的是监控消费者滞后:在您的应用程序中 KafkaStreams#metrics()
允许您访问所有客户端(即消费者/生产者)和 Kafka Streams 指标。消费者公开了一个名为 records-lag-max
的指标。这可能会有所帮助。
当然,在正常的处理过程中(假设一直有新的数据写入输入主题)消费者延迟会一直上下波动。
关于apache-kafka-streams - 检查 StateStore 是否已完全填充,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60640326/