apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?

标签 apache-kafka-streams spring-cloud-stream

我使用 Spring cloud strema Kstream。
我测试一个主题和一个 @StreamListner .没关系。

我修改了两个 KStream 输入的代码。 (两个 @StreamListener)
但是, Spring 云错误..


***************************
APPLICATION FAILED TO START
***************************

Description:

The bean 'stream-builder-process', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true


Process finished with exit code 1

第一听众


    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC1_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic1Source {

      @StreamListener
      public void process(@Input(TOPIC1_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic1 : " + value);
            });
      }
    }


只有 第一听众 没问题。

第二个听众


    package com.kstream.spring.cloud.test1;

    import static com.kstream.spring.cloud.test1.MyBinding.TOPIC2_IN;

    import org.apache.avro.generic.GenericRecord;
    import org.apache.kafka.streams.kstream.KStream;
    import org.springframework.cloud.stream.annotation.Input;
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.stereotype.Component;

    @Component
    public class Topic2Source {

      @StreamListener
      public void process(@Input(TOPIC2_IN) KStream<String, GenericRecord> logs) {

        logs
            .foreach((key, value) -> {
              System.out.println("Test Topic2 : " + value);
            });
      }
    }


但这是错误

应用程序属性
spring.application.name=kafka-streams-test
spring.kafka.bootstrap-servers=my brokers

# defaults
spring.cloud.stream.kafka.streams.binder.brokers=my brokers
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url=my server


# topic1
spring.cloud.stream.bindings.topic1In.destination=topic1
spring.cloud.stream.bindings.topic1In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic1In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic1In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde




# topic2
spring.cloud.stream.bindings.topic2In.destination=topic2
spring.cloud.stream.bindings.topic2In.consumer.useNativeDecoding=true
spring.cloud.stream.bindings.topic2In.consumer.header-mode=raw
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.keySerde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.bindings.topic2In.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

最佳答案

我找到错误原因。
因为我定义了两个相同的“进程”方法名称。

关于apache-kafka-streams - Spring Cloud Stream 中是否可以有多个@StreamListener?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55136863/

相关文章:

apache-kafka - 具有多个输出主题的 Kafka 流拓扑的并发性

java - 是否可以禁用 spring cloud stream starter 应用程序的安全性?

spring-boot - 消息调度异常 : Dispatcher has no subscribers

java - Spring Cloud Stream kafka聚合使用 Autowiring bean的新实例

apache-kafka-streams - Kafka 流 DSL : aggregate, 丰富并发送

apache-kafka - Kafka Streams - 跳跃窗口 - 去重 key

apache-kafka - 使用 Kafka Streams 在输出中设置时间戳

unit-testing - Kafka Streams 测试 : java. util.NoSuchElementException:未初始化的主题: "output_topic_name"

spring-xd - Spring Cloud 数据流中的 Kafka 源

spring-cloud-stream - Spring Cloud sleuth 与 Spring Cloud Stream