我需要加入 3 个 Kafka 主题。前两个主题 A 和 B 将使用内连接添加,因为消息 key 相同,并生成一个 POJO 与 B 相同的新 Kafka 流。现在有了这个累积流,我需要加入另一个主题 C,并且需要对输出进行分组基于 C 中存在的字段。
到目前为止,我对此有以下方法:
KStream - 前两个主题(A 和 B)的 KStream 内连接 是否可以不在任何主题上发布此累积输出,但仍然可以在下面使用它
KStream - KStream(以上累积流和主题C)
您能否建议一种更好的方法或任何我可以在 java 中查看类似实现的示例。
最佳答案
您可以使用两个连续的联接:
KStream streamAB = streamA.join(streamB, ...);
// either:
KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
.join(streamC, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
KStream streamABC = streamA.join(streamCnew, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to a new join key
KStream streamABC = streamA.selectKey(...) // set to a new join key
.join(streamC, ...);
streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");
关于java - Spring和卡夫卡: Join 3 Kafka topics to generate output Kafka streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55680574/