我试图理解有状态
流处理器
。
据我了解,这种类型的流处理器,它使用状态存储
维护某种状态。
我了解到,实现State Store
的方法之一是使用RocksDB
。假设以下拓扑
(并且只有一个处理器有状态
)
A->B->C;处理器 B 有状态,具有本地状态存储和更改日志
启用。我正在使用低级 API。
假设 sp 监听单个 kafka 主题,例如具有 10 个分区的 topic-1
。
我观察到,当应用程序启动时(不同物理机中的 2 个实例,num.stream.threads
= 5),那么对于状态存储
,它会创建目录结构其中
有如下内容:
0_0、0_1、0_2...0_9(每台机器有 5 个,总共 10 个分区)。
我正在浏览一些在线 Material ,其中说我们应该创建一个 StoreBuilder
并使用 addStateStore()
附加它的拓扑而不是在处理器内创建状态存储。
喜欢:
topology.addStateStore(storeBuilder,"processorName")
Ref also: org.apache.kafka.streams.state.Store
我不明白将 storeBuilder 附加到拓扑与实际在处理器内创建状态存储有什么区别。它们之间有什么区别?
第二部分:对于statestore,它创建目录,如:0_0、0_1 等。谁以及如何创建它? kafka 主题(sp 正在监听的主题)和为 State Store
创建的目录数量之间是否存在某种 1:1 映射?
最佳答案
I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?
为了让 Kafka Streams 为您管理存储(容错、迁移),Kafka Streams 需要了解存储。因此,您为 Kafka Streams 提供一个 StoreBuilder,Kafka Streams 会为您创建和管理存储。
如果您只是在处理器内创建一个存储,Kafka Streams 不知道该存储,并且该存储不会容错。
For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?
是的,有一个映射。该存储在输入主题分区的数量上是共享的。您还可以为每个分区获取一个“任务”,任务目录名为 y_z
,其中 y
为子拓扑编号,z
为分区数字。对于您的简单拓扑,您看到的所有目录只有一个子拓扑,具有相同的 0_
前缀。
因此,您的逻辑存储有 10 个物理分片。当相应的输入主题分区分配给不同的实例时,此分片允许 Kafka Streams 迁移状态。总体而言,您最多可以运行 10 个实例,每个实例将处理一个分区,并托管存储的一个分片。
关于apache-kafka - 具有状态存储的 Kafka 有状态流处理器 : Behind the scenes,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61622414/