spring-boot - SpringBoot微服务@StreamListener抛出RunTimeException时无限次重试

标签 spring-boot apache-kafka spring-cloud-stream

我有一个 @StreamListener 方法,它将执行 REST 调用。当REST调用返回异常时,@StreamListener方法将抛出RunTimeException并执行重试。 @StreamListener方法抛出RuntimeException时将无限次重试

Spring Cloud Stream重试配置:

spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000

SpringBoot微服务依赖版本:

Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0

最佳答案

使用RetryTemplate或增加maxAttempts属性有一个限制,即重试必须在max.poll.interval.ms内完成,否则Kafka Broker会认为消费者已关闭并将分区重新分配给另一个消费者(如果有的话)。

另一个选项是让监听器使用 consumer.seek 方法从 Kafka 重新读取相同的消息。

@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.OFFSET) String offset) {
   try {
       //do the logic (example: REST call)
   } catch (Exception e) { // Catch only specific exceptions that can be retried
        consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
   }
}

关于spring-boot - SpringBoot微服务@StreamListener抛出RunTimeException时无限次重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52042849/

相关文章:

Spring Boot 不读取 data.sql 但它读取 import.sql

Java时间不同步

elasticsearch - Kafka-connect elasticsearch用于索引的自动小写主题名称

distributed-computing - KafKa 如何保证一致性和可用性?

spring-boot - 使用 Spring Cloud Stream Kafka 3.0.3.RELEASE 时是否可以应用正常关闭?

java - Cassandra 实体必须具有 @Table、@Persistent 或 @PrimaryKeyClass 注解

c# - 如何使用 C# 从 Kafka 获取主题列表

java - "Malformed data. Length is negative"反序列化 avro 类时

使用 Spring Cloud Stream 处理 Avro 反序列化异常

spring-boot - 同步调用多个api,spring boot webflux