apache-kafka-streams - 检查 StateStore 是否已完全填充

标签 apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka

我有一个紧凑的主题,大约有 30 个 Mio 键。
我的 App将此主题具体化为 KeyValueStore .

我如何检查 KeyValueStore是完全人口?如果我通过 InteractiveQuery 查找 key 我需要知道 key 是否不存在,因为 StateStore尚未准备好,或者 key 确实不存在。

我以这种方式实现 StateStore:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }

最佳答案

通常,没有“完全加载”这样的东西,因为在应用程序启动后的任何时间点,新数据可能会写入输入主题,并且会读取这些新数据以更新相应的表。

您可以做的是监控消费者滞后:在您的应用程序中 KafkaStreams#metrics()允许您访问所有客户端(即消费者/生产者)和 Kafka Streams 指标。消费者公开了一个名为 records-lag-max 的指标。这可能会有所帮助。

当然,在正常的处理过程中(假设一直有新的数据写入输入主题)消费者延迟会一直上下波动。

关于apache-kafka-streams - 检查 StateStore 是否已完全填充,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60640326/

相关文章:

java - Kafka dsl api 开发人员指南中的问题

jmx - 如何为自定义处理器定义新指标(并使它们在 jconsole 中可用)?

java - Spring Cloud Stream Service-Bus Binder的错误 channel

microservices - 微服务消息传递数据库分配的标识符

spring-boot - Spring Batch ChunkRequest 抛出 stackOverflow

java - 使用 Spring Cloud Stream 的 Kafka Streams 进程中的 Serde 错误

java - Spring Cloud Stream RabbitMQ Binder - Spring Cloud函数错误处理

java - Spring Cloud Stream 中使用 StreamBridge 的生产者回调

java - 创建主题失败”,"exception":"\norg. apache.kafka.common.errors.UnsupportedVersionException

kubernetes - 在Kubernetes中查询远程状态存储(交互式查询)