apache-kafka - 卡夫卡流 : Define multiple Kafka Streams using Spring Cloud Stream for each set of topics

标签 apache-kafka apache-kafka-streams spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka

我正在尝试使用 Kafka Streams 做一个简单的 POC。但是我在启动应用程序时遇到异常。我正在使用 Spring-Kafka,Kafka-Streams 2.5.1 和 Spring boot 2.3.5 Kafka流配置

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processCCC() {
        return input -> input.peek((key, value) -> log
                .info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    /*
    @Bean
    public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @Bean
    public Topology kafkaStreamTopology() {
        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
        return streamsBuilder.build();
    } */
}

application.yaml 配置如下。这个想法是我有 3 个输入和 3 个输出主题。 该组件从输入主题获取输入并将输出提供给输出主题。

spring:
  application.name: consumerapp-1
  cloud:
    function:
      definition: processAAA;processBBB;processCCC
    stream:
      kafka.binder: 
          brokers: 127.0.0.1:9092
          autoCreateTopics: true
          auto-add-partitions: true
      kafka.streams.binder:
          configuration: 
            commit.interval.ms: 1000
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        processAAA-in-0:
          destination: aaaInputTopic
        processAAA-out-0:
          destination: aaaOutputTopic
        processBBB-in-0:
          destination: bbbInputTopic
        processBBB-out-0:
          destination: bbbOutputTopic
        processCCC-in-0:
          destination: cccInputTopic
        processCCC-out-0:
          destination: cccOutputTopic

抛出的异常是

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

任何人都可以帮助我处理多个输入和输出主题的 Kafka Streams Spring-Kafka 代码示例。

更新:2021 年 1 月 21 日

删除所有 kafkaStreams 和 kafkaStreamsTopology bean 配置后,我在无限循环中收到以下消息。消息消费仍然不起作用。我已经使用 @Bean 函数定义检查了 application.yaml 中的订阅。他们对我来说都很好,但我仍然遇到这个交叉接线错误。 我已经用上面的 application.yaml 替换了 application.properties

    [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription

最佳答案

我已经设法解决了这个问题。我写这篇文章是为了其他人的利益。 如果您想在单个应用程序 jar 中包含多个流,那么关键是定义多个应用程序 ID,每个流一个。我一直都知道这一点,但我不知道如何定义它。最后,答案是我在阅读 SCSt 文档后设法挖掘出来的。下面是 application.yaml 的定义方式。 application.yaml 如下所示

spring:
  application.name: kafkaMultiStreamConsumer
  cloud:
    function:
      definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
    stream:
      kafka: 
        binder:
          brokers: 127.0.0.1:9092
          min-partition-count: 3
          replication-factor: 2
          transaction:
            transaction-id-prefix: transaction-id-2000
          autoCreateTopics: true
          auto-add-partitions: true
        streams:
          binder:
            functions: 
            // needed for functional
              processBBB: 
                application-id: SampleBBBapplication
              processAAA: 
                application-id: SampleAAAapplication
              processCCC: 
                application-id: SampleCCCapplication
            configuration: 
              commit.interval.ms: 1000            
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde        
      bindings:
      // Below is for Imperative Style programming using 
      // the annotation namely @StreamListener, @SendTo in .java class
        inputAAA:
          destination: aaaInputTopic
        outputAAA:
          destination: aaaOutputTopic
        inputBBB:
          destination: bbbInputTopic
        outputBBB:
          destination: bbbOutputTopic
        inputCCC:
          destination: cccInputTopic
        outputCCC:
          destination: cccOutputTopic
     // Functional Style programming using Function<KStream...> use either one of them
     // as both are not required. If you use both its ok but only one of them works
     // from what i have seen @StreamListener is triggered always.
     // Below is from functional style
        processAAA-in-0:
          destination: aaaInputTopic
          group: processAAA-group
        processAAA-out-0:
          destination: aaaOutputTopic
          group: processAAA-group
        processBBB-in-0:
          destination: bbbInputTopic
          group: processBBB-group
        processBBB-out-0:
          destination: bbbOutputTopic
          group: processBBB-group
        processCCC-in-0:
          destination: cccInputTopic
          group: processCCC-group
        processCCC-out-0:
          destination: cccOutputTopic
          group: processCCC-group

定义完上面的内容后,我们现在需要定义实现流处理逻辑的各个 java 类。 您的 Java 类可以如下所示。根据您的要求为其他 2 个或 N 个流创建类似的。一个例子如下:AAASampleStreamTask.java

@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
    private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);

    @StreamListener(AAASampleChannel.INPUT)
    @SendTo(AAASampleChannel.OUTPUT)
    public KStream<String, String> processAAA(KStream<String, String> input) {
        input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
       ...
       // do other business logic
       ...
        return input;
    }
    
    /**
     * Use above or below. Below style is latest startting from ScSt 3.0 if iam not 
     * wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have 
     * both then above gets priority as per my observation
     */     
    /* 
    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log.info(
                "Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
       ...
     // do other business logic
       ...
    }
    */
}

如果您想使用命令式编程而不是功能性编程,则需要 channel 。 AAASampleChannel.java

public interface AAASampleChannel {
    String INPUT = "inputAAA";
    String OUTPUT = "outputAAA";

    @Input(INPUT)
    KStream<String, String> inputAAA();

    @Output(OUTPUT)
    KStream<String, String> outputAAA();
}

关于apache-kafka - 卡夫卡流 : Define multiple Kafka Streams using Spring Cloud Stream for each set of topics,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65792786/

相关文章:

java - 在 Kafka Stream 中处理消息时发生错误时重新处理消息

hadoop - 如何按顺序从Apache Spark向Kafka发送消息主题

apache-kafka - java.lang.ClassNotFoundException : kafka. api.OffsetRequest

java - 如何在消费消息时访问 Kafka header ?

apache-kafka - kafka中b/w group.id,application.id和client.id有什么区别?

java - Spring Kafka 应用程序的应用程序负载测试期间未找到主题错误

elasticsearch - Logstash期望#出什么问题了

java - kafka 崩溃后,偏移量丢失

java - Kafka GroupTable 测试使用 ProcessorTopologyTestDriver 时生成额外消息

java - spring boot kafka LocalDateTime