在基于 DSL 的转换中,我有一个流-->分支,我希望将分支输出重定向到多个主题。
当前的branch.to()
方法仅接受String
。
是否有任何带有 stream.branch
的简单选项,我可以将结果路由到多个主题。对于消费者,我可以通过提供字符串数组作为主题来订阅多个主题。
如果特定谓词满足查询,我的问题要求我采取多个操作。
我尝试使用stream.branch[index].to(string)
,但这不足以满足我的要求。我正在寻找类似 stream.branch[index].to(string array of topic)
或 stream.branch[index].to(string)
。
我期望 branch.to
方法具有多个主题,或者是否有其他方法可以通过流实现相同的目的?
添加示例代码。删除实际变量名称。
我的谓词
Predicate <String, MyDomainObject> Predicate1 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
Predicate <String, MyDomainObject> Predicate2 = new Predicate<String, MyDomainObject>() {
@Override
public boolean test(String key, MyDomainObject domObj) {
boolean result = false;
if condition on domObj
return result;
}
};
KStream <String, MyDomainObject>[] branches= myStream.branch(
Predicate1, Predicate2
);
// here I need your suggestions.
// this is my current implementation
branches[0].to(singleTopic),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));
// I want to send notification to multiple topics. something like below
branches[0].to(topicList),
Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));
最佳答案
如果您知道要将数据发送到哪些主题,则可以执行以下操作:
branches[0].to("first-topic");
branches[0].to("second-topic");
// etc.
关于java - Kafka 流,分支输出到多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56706839/