我有消息想聚合成一个状态,但我也 喜欢在可能更新其他的预处理步骤中使用该状态 状态也是如此。一个示例可能是查看此消息是否更改 状态的某个部分,如果是这样,更新一个跟踪那个的主题 状态的特定部分,以及状态本身。
我能想到的最好的方法是使用一个 Transformer 可以访问状态存储,但在聚合状态之前这样做, 这样我就可以在它被删除之前看到状态的值是什么 更新。 (有问题的 Transformer 只会返回原来的 可能更新额外状态后的消息。)
但是,我遇到了一个先有鸡还是先有蛋的问题:
如果我添加
Materialized
和aggregate()
如下,我得到一个 拓扑错误表明我正在尝试访问状态存储 尚未添加到拓扑中。KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde())); streamBuilder .transformValues( new MessagePreprocessorSupplier( "state_store_topic_name" ), "state_store_topic_name" ) .groupByKey() .aggregate( () -> null, new MyAggregator(), Materialized.as("state_store_topic_name") );
这引发了:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is not added yet. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStore(InternalTopologyBuilder.java:716) at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.connectProcessorAndStateStores(InternalTopologyBuilder.java:615) at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:546) at org.apache.kafka.streams.kstream.internals.KStreamImpl.transformValues(KStreamImpl.java:538) at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
如果我先尝试创建状态存储,然后我可以将它添加到 变压器,但是当我调用
.aggregate()
时出现错误, 表示此时无法添加状态存储,因为 我们之前已经添加了它。Materialized<String, MyState, KeyValueStore<Bytes, byte[]>> myStateStoreProvider = Materialized.<String, Thermostat, KeyValueStore<Bytes, byte[]>>as("state_store_topic_name") .withKeySerde(Serdes.String()) .withValueSerde(myStateSerde); /* really don't think we should need this, but if I don't, the .transformValues says it wasn't added to the topology... */ streamsBuilder.table("state_store_topic_name", myStateStoreProvider); KStream<String, Message> stream = streamsBuilder.stream(config.getDefaultSourceTopicName(), Consumed.with(Serdes.String(), new MessageSerde())); streamBuilder .transformValues( new MessagePreprocessorSupplier( "state_store_topic_name" ), "state_store_topic_name" ) .groupByKey() .aggregate( () -> null, new MyAggregator(), myStateStoreProvider );
这引发了:
org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore state_store_topic_name is already added. at org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addStateStore(InternalTopologyBuilder.java:523) at org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder.build(GroupedStreamAggregateBuilder.java:71) at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.doAggregate(KGroupedStreamImpl.java:488) at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregateMaterialized(KGroupedStreamImpl.java:175) at org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.aggregate(KGroupedStreamImpl.java:167) at myapp.stream_processor.KafkaApplication.configureTopology(KafkaApplication.java:48)
最佳答案
如果您回退到 Processor API 以将商店连接到转换器,它可能会起作用:
- 添加转换器而不添加状态(因为它尚不存在)
- 定期添加聚合以创建状态
- 调用
builder.build()
后,调用Topology#connectProcessorAndStateStore()
- 需要传入变压器名称和店铺名称
- 您可以通过
Topology#describe()
检索转换器的名称
虽然我还没有尝试过...
关于java - 如何在聚合和预处理器中重用状态存储?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50518198/