java - 在 kstreams 应用程序中使用自定义 Kafka 状态存储

标签 java apache-kafka apache-kafka-streams spring-cloud-stream spring-cloud-stream-binder-kafka

我们正在使用包含在 spring cloud stream Hoxton RC7 项目中的 Kafka-streams(因此使用提供的 Kafka-streams 和 Kafka-client 版本 [2.3.1])


ext {
    set('springCloudVersion', 'Hoxton.SR7')
}
...

dependencies {
    // spring cloud stream
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation("org.springframework.cloud:spring-cloud-stream")
    // redis 
    implementation 'io.lettuce:lettuce-core'
    implementation 'org.springframework.data:spring-data-redis'
    testCompile 'it.ozimov:embedded-redis:0.7.2'
    ...

我们已经实现了一个 kstreams 应用程序

@Bean
public Consumer<KStream<String, IncomingEvent>> process() {

    return input -> {

我们在其中进行一些聚合,例如:

.aggregate(Foo::new, (key, value1, aggregate) ->
                (aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
                        ? value1
                        : aggregate,
        materialized

)

现在物化应该是一个自定义的外部状态存储(Redis):

Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
        Materialized.as("redis-store");

由 StoreBuilder Bean 提供:

@Bean
public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
    return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
            new Serdes.StringSerde(),
            new SomeFooSerde());
}


public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {

    return new KeyValueBytesStoreSupplier() {
        @Override
        public String name() {
            return "redis-store";
        }

        @Override
        public KeyValueStore<Bytes, byte[]> get() {
            return redisKeyValueStoreBytes;
        }

        @Override
        public String metricsScope() {
            return "redis-session-state";
        }
    };
}

我现在使用 EmbeddedKafka 测试应用程序:

@ActiveProfiles("test")
@RunWith(SpringRunner.class)
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@SpringBootTest(classes = {TestConfigurationTests.class})
@EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
public class TestKafkaIntegration {

我尝试访问状态存储并查询添加的项目的位置:

ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
        "redis-store", QueryableStoreTypes.keyValueStore());
return queryableStore;

但是当我运行我的测试时,我收到一个错误:

Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.

几个问题:

  • [1] 中解释的使用自定义状态存储的示例在处理器中使用它。这是否意味着我无法在聚合中使用自定义状态存储?
  • 当无法在聚合中使用它时,无论如何使用自定义状态存储有什么意义?
  • 当我稍微更改上面的 kstreams 代码并定义一个处理器而不是在聚合方法中使用物化时,错误发生变化,然后它在尝试执行 getQueryableStore 时提示缺少状态“redis-store”存储。但实际上我可以看到,addStateStoreBeans 注册了“redis-store”。怎么会这样?

我想使用自定义状态存储的原因是,我(真的很容易)无法为应用程序实例提供专用硬盘。为了让应用程序快速启动,我想避免在每次启动应用程序时处理完整的变更日志(最好每天进行几次,目前需要一个多小时)。那么现在最后一个问题:

  • 使用自定义外部状态存储时,我能否在应用程序重启时恢复到上次状态?

[1] https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries

最佳答案

您正在使用 Materialized.as(java.lang.String storeName)这将创建(具体化)一个具有给定名称(此处为“redis-store”)的 StateStore。另一方面,使用 builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes) 您正在创建 another StateStore 具有相同的名称,springframework 可能会自动将其添加到拓扑结构,这样您就会收到“商店已添加”错误。

q1:您可以在聚合中使用自定义状态存储;与 Materialized.as(KeyValueBytesStoreSupplier supplier) 一起使用

q2:还可以使用带有转换器或自定义处理器的 StateStore 进行交互式查询;也可以通过 global StateStore 访问整个主题而不是 KafkaStreams仅实例分配的分区(参见 addGlobalStoreglobalTable )

q3:我猜你没有(手动)在拓扑中注册状态存储;见Topology.addStateStore(StoreBuilder<?> storeBuilder, java.lang.String... processorNames)Connecting Processors and State Stores

q4:是的,state store is loaded from a changelog topic (可能是使用 optimizations 时的原始主题)

关于java - 在 kstreams 应用程序中使用自定义 Kafka 状态存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63315492/

相关文章:

java - 与字符串实习相关

mysql - Kafka生产者与Kafka连接读取MySQL数据源

docker - Kafka-从命令行生成时出错(字符('<'(代码60)):应为有效值)

scala - 如何将Java api KStream转换为Scala api KStream?

apache-kafka - 如何在两个 Kafka 流之间使用持久化的 StateStore

apache-kafka - Kafka Streams 聚合阶段是否序列化和反序列化每个单个元素?

java - 如何将用户输入合并到评分系统中(JFrame)

java - 子类和父类中的方法重写可以有不同数量的参数?参数数量可以不同吗?

hadoop - 如何在 hadoop 配置中使用亚马逊实例的公共(public) IP?

java - 找不到符号错误