java - 线程 "main"org.apache.kafka.streams.errors.InvalidStateStoreException : 中出现异常

标签 java apache-kafka apache-kafka-streams

我正在尝试访问我在同一个java程序中创建的inMemoryStore。但返回异常为 “线程“main”org.apache.kafka.streams.errors.InvalidStateStoreException 中出现异常:状态存储 storeName 可能已迁移到另一个实例。”

当我使用 permanentKeyValueStore 时,它​​工作正常并且能够创建存储并返回值。

package com.bakdata.streams_store.demo;

import java.util.Collection;
import java.util.Properties;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.stre7ams.state.Stores;
import org.apache.kafka.streams.state.StreamsMetadata;

public class InMemoryStore {

public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-id-0001");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

    String storeName = "sample";
    KeyValueBytesStoreSupplier stateStore = Stores.inMemoryKeyValueStore(storeName);
    StoreBuilder<KeyValueStore<String, String>> storeBuilder = Stores.keyValueStoreBuilder(stateStore, Serdes.String(), Serdes.String());

    StreamsBuilder builder = new StreamsBuilder();
    builder.addStateStore(storeBuilder);
    KStream<String, String> inputStream = builder.stream("material_test1");
    KafkaStreams streams = new KafkaStreams(builder.build(), props);

    try {
        streams.start();
        Thread.sleep(30000);
    } catch (final Throwable e) {
        System.exit(1);
    }
    final ReadOnlyKeyValueStore<String, String> keyValueStore = streams.store(storeName, QueryableStoreTypes.keyValueStore());
    KeyValueIterator<String, String> range = keyValueStore.all();
    while (range.hasNext()) {
        KeyValue<String, String> next = range.next();
        System.out.println("Key: " + next.key + ", value: " + next.value);
    }
}
}

Exception in thread "main" org.apache.kafka.streams.errors.InvalidStateStoreException: The state store, sample, may have migrated to another instance. at org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:62) at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1067) at com.bakdata.streams_store.demo.InMemoryStore.main(InMemoryStore.java:59)

我希望打印 ReadOnlyStoreQuery 中的值。

最佳答案

流上不能有 StateStore,因为单个键可能有多个值。您需要先将其转换为 KTable (streams.table(...)) 或 GlobalKtable (streams.globalTable(...))。

Kotlin 示例:

val businessObjects = streamsBuilder.globalTable("topic", eventStore("store-name"))

其中 eventStore 是:

fun eventStore(name: String) = Materialized.`as`<String, String>(Stores.inMemoryKeyValueStore(name))
    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.String())

开始直播后:

var store: ReadOnlyKeyValueStore<String, String> = streams.store("store-name", keyValueStore<String, String>())

注意:还有一个接口(interface) KafkaStreams.StateListener 用于流准备就绪时

 override fun onChange(newState: KafkaStreams.State?, oldState: KafkaStreams.State?) =
    Option.fromNullable(newState)
        .filter { REBALANCING == oldState && RUNNING == it }
        .map { store = streams.store("store-name", keyValueStore<String, String>()) }
        .getOrElse { log.info("Waiting for Kafka being in REBALANCING -> RUNNING, but it is $oldState -> $newState") }

或者,您也可以使用

将流转换为 KTable
stream.groupByKey().reduce(...)

如所描述的here .

关于java - 线程 "main"org.apache.kafka.streams.errors.InvalidStateStoreException : 中出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57007888/

相关文章:

java - 通过 vnet 对等连接将 Azure 应用程序服务连接到 MongoDB Atlas

java - 如何以数字0退出循环?但现在我有其他问题

java - Kafka在一段时间后停止,失去了broker

java - 是否支持带有连接的接收器和源主题相同的Kafka Stream?

apache-kafka - Kafka 事件携带的状态传输系统是否应该使用 GlobalKTable 进行本地查询来实现?

Java 随机化 -(减号)和 +(加号)

java - while 循环中的条件不起作用?

apache-kafka - Kafka 消费者是否应该从所有分区中消费以获得完整的消息?

apache-kafka - 具有更改日志主题与日志压缩源主题的 Kafka Streams KTable 存储

apache-kafka - 如果 RocksDB 缓存在内存中,为什么要在 Kafka Streams Processor API 中启用记录缓存?