apache-kafka-streams - Kafka Streams - 使用处理器 API 实现连接

标签 apache-kafka-streams

我知道可以使用 dsl api 执行连接。由于各种原因,我们需要使用处理器 api。

如何使用处理器 api 实现加入流。我有一些想法,但不认为它们是正确的。

  1. 具有多个源主题的一个处理器。流程接口(interface)的基础对象,然后转换为流程方法内的正确类型。

  2. 两个处理器,每个处理器都有自己的源主题。每个处理器都获得对其他处理器状态存储的只读访问权限(如果可能的话)。

任何想法 - 我确实在 KStreamImpl 中找到了连接实现,但我无法遵循。也许是关于 dsl 如何做到这一点的解释?

最佳答案

您建议的两种实现方式都是可行的。 Kafka Stream本身使用5个处理器来实现stream-stream join:

source1 ---> "state maintainer 1" --> "joiner 1" ----+
                      |                   |          |
                   updates          "join lookups"   |
                      |                   |          +-----+
                      |            +------+                |
                      v            |                       v
                  "state 1" <------|------+             "merger" -->
                                   |      |                ^
                  "state 2" <------+      |                |
                      ^                   |          +-----+
                      |                   |          |
                   updates          "join lookups"   |
                      |                   |          |
source2 ---> "state maintainer 2" --> "joiner 2" ----+

左右管道对称。两者都有一个“状态维护者”和“加入者”Processor。 “状态维护者”对状态有写入权限。 “Joiner”作为对其他状态的读取访问。最后一步,将两个连接结果流合并在一起。

关于apache-kafka-streams - Kafka Streams - 使用处理器 API 实现连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53485632/

相关文章:

java - 重新分区后,Kafka 流不使用 serde

java - Kafka Streams 任务分配

java - 带有替代项的重载方法值表

apache-kafka-streams - Kafka Stream 使用 JoinWindow 进行数据重放

java - Kafka Stream - 只有一个实例从分区读取

java - 如何在内存中的 Kafka Streams 状态存储上启用缓存

java - 如何在聚合和预处理器中重用状态存储?

apache-kafka - Kafka Stream 在 KTable 值字段上分组

apache-kafka - 在 Kafka 中使用事件携带状态传输方法时如何确保一致性

java - 如何从 SubscriableChannel 构建 KStream