apache-kafka-streams - 为什么我的 Kafka Transformer 的 StateStore 无法访问?

标签 apache-kafka-streams

我正在尝试实现一个 Transformer 类

public class StreamSorterByTimeStampWithDelayTransformer < V > 
    implements Transformer< Long, V, KeyValue< Long, V > >

类的构造函数为每个实例创建一个 StateStore,因此:

this.state_store_name = "state_store_" + this.hashCode();

KeyValueBytesStoreSupplier key_value_store_supplier = Stores.persistentKeyValueStore( state_store_name );
StoreBuilder< KeyValueStore< String, V > > key_value_store_builder = 
    Stores.keyValueStoreBuilder( key_value_store_supplier, Serdes.String(), v_instance.getSerde() );
stream_builder.addStateStore( key_value_store_builder );

Transformer init 方法因此引用了 StateStore:

public void init(ProcessorContext context) {
    this.context = context;
    this.key_value_store = (KeyValueStore< String, V >) context.getStateStore( state_store_name );
    // schedule a punctuate() method every "poll_ms" (wall clock time)
    this.context.schedule( this.poll_ms, PunctuationType.WALL_CLOCK_TIME, 
            (timestamp) -> pushOutOldEnoughEntries() );
}

我想我必须遗漏一个步骤,因为当调用 getStateStore 时,它 导致异常说:

Processor KSTREAM-TRANSFORM-0000000003 has no access to StateStore

我遗漏了什么或做错了什么?

最佳答案

您需要将商店附加到变压器:

stream.transform(..., this.state_store_name);

关于apache-kafka-streams - 为什么我的 Kafka Transformer 的 StateStore 无法访问?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49712540/

相关文章:

apache-kafka-streams - 如何使用处理器 API 访问 DSL 创建的 KTable/GlobalKTable?

apache-kafka-streams - 如何判断 Kafka Streams 应用程序何时处于 "running"状态?

java - 卡夫卡流: How to get the first and the last record of a SessionWindow?

apache-kafka - 如何定义应在一天中的特定时间提前的时间窗口?

apache-kafka - 无法在具有多个主题分区的 Kafka Streams 中重新平衡错误

java - 是否可以对 GlobalKTable 进行 ReKey?

d3.js - Bluemix Kafka 流

java - Kafka Streams SessionWindows 中的节流

apache-kafka - Kafka Streams 一个主题中的多个对象并反序列化

java - 如何转换/ fork Kafka 流并将其发送到特定主题?