java - Kafka 流,分支输出到多个主题

标签 java apache-kafka apache-kafka-streams

在基于 DSL 的转换中,我有一个流-->分支,我希望将分支输出重定向到多个主题。 当前的branch.to()方法仅接受String。 是否有任何带有 stream.branch 的简单选项,我可以将结果路由到多个主题。对于消费者,我可以通过提供字符串数组作为主题来订阅多个主题。

如果特定谓词满足查询,我的问题要求我采取多个操作。

我尝试使用stream.branch[index].to(string),但这不足以满足我的要求。我正在寻找类似 stream.branch[index].to(string array of topic)stream.branch[index].to(string)

我期望 branch.to 方法具有多个主题,或者是否有其他方法可以通过流实现相同的目的?

添加示例代码。删除实际变量名称。

我的谓词

    Predicate <String, MyDomainObject> Predicate1 = new Predicate<String, MyDomainObject>() {
        @Override
        public boolean test(String key, MyDomainObject domObj) {
            boolean result = false;
    if condition on domObj
            return result;
        }
    };
    Predicate <String, MyDomainObject> Predicate2 = new Predicate<String, MyDomainObject>() {
        @Override
        public boolean test(String key, MyDomainObject domObj) {
            boolean result = false;
    if condition on domObj
            return result;
        }
    };

    KStream <String, MyDomainObject>[] branches= myStream.branch(
            Predicate1, Predicate2
    );


// here I need your suggestions.

// this is my current implementation
branches[0].to(singleTopic),
            Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));

// I want to send notification to multiple topics. something like below

branches[0].to(topicList),
            Produced.with(Serdes.String(), Serdes.serdeFrom(inSer, deSer)));

最佳答案

如果您知道要将数据发送到哪些主题,则可以执行以下操作:

branches[0].to("first-topic");
branches[0].to("second-topic");
// etc.

关于java - Kafka 流,分支输出到多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56706839/

相关文章:

java - 如何使用 Apache Kafka 进行内容过滤?

java - 如何修复Gradle Build中的 “Failed to process kafka-clients-1.1.1.jar”错误

java - hibernate :我错过了什么?找不到@Id,而是@Index

java - 如何在 LibGdx 中扩展自顶向下游戏的背景?

java - 用于kafka实现的python vs java

java - 使用 Stream DSL 在 Kafka Streams 中指定每个流/表时间戳提取器

apache-kafka - 为什么我在检索商店进行查询时偶尔会收到 InvalidStateStoreException PARTITIONS_REVOKED,而不是 RUNNING?

java - Spring和卡夫卡: Join 3 Kafka topics to generate output Kafka streams

java - 将 ArrayList 或数组转换为字符串数组

java - 为什么 java.text.DateFormat 在 Android 上为 en_US 和 en_GB 返回相同的日期格式?