java - Spring和卡夫卡: Join 3 Kafka topics to generate output Kafka streams

标签 java apache-kafka apache-kafka-streams spring-kafka

我需要加入 3 个 Kafka 主题。前两个主题 A 和 B 将使用内连接添加,因为消息 key 相同,并生成一个 POJO 与 B 相同的新 Kafka 流。现在有了这个累积流,我需要加入另一个主题 C,并且需要对输出进行分组基于 C 中存在的字段。

到目前为止,我对此有以下方法:

KStream - 前两个主题(A 和 B)的 KStream 内连接 是否可以不在任何主题上发布此累积输出,但仍然可以在下面使用它

KStream - KStream(以上累积流和主题C)

您能否建议一种更好的方法或任何我可以在 java 中查看类似实现的示例。

最佳答案

您可以使用两个连续的联接:

KStream streamAB = streamA.join(streamB, ...);
// either:
KStream streamABC = streamA.selectKey(...) // set to the key as in streamC
                           .join(streamC, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to the key as in streamAB
KStream streamABC = streamA.join(streamCnew, ...);
// or:
KStream streamCNew = streamC.selectKey(...); // set to a new join key
KStream streamABC = streamA.selectKey(...) // set to a new join key
                           .join(streamC, ...);

streamABC.selectKey(/* extract grouping field and set as key */).to("outputTopic");

关于java - Spring和卡夫卡: Join 3 Kafka topics to generate output Kafka streams,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55680574/

相关文章:

java - 获取文件的元数据

java - 在 Consumer 之间均匀分布 Kafka 分区

apache-kafka - 如何设置状态存储更改日志主题的 max.message.bytes?

apache-kafka - Kafka Streams stateStores 容错一次?

java - 在 LDAP 服务器中验证用户身份

java - Java AWT HelloWorld 示例的最短路径是什么?

java - 无法使用日语字符对主题进行编码

apache-kafka - 有什么方法可以检查 kafka 是否已从 kafka-net 启动并运行

apache-kafka - 使用列的默认值创建 KSQL 流?

java - 事件计数的窗口聚合