我有一个 @StreamListener 方法,它将执行 REST 调用。当REST调用返回异常时,@StreamListener方法将抛出RunTimeException并执行重试。 @StreamListener方法抛出RuntimeException时将无限次重试
Spring Cloud Stream重试配置:
spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000
SpringBoot微服务依赖版本:
Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0
最佳答案
使用RetryTemplate或增加maxAttempts属性有一个限制,即重试必须在max.poll.interval.ms
内完成,否则Kafka Broker会认为消费者已关闭并将分区重新分配给另一个消费者(如果有的话)。
另一个选项是让监听器使用 consumer.seek
方法从 Kafka 重新读取相同的消息。
@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
//do the logic (example: REST call)
} catch (Exception e) { // Catch only specific exceptions that can be retried
consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
}
}
关于spring-boot - SpringBoot微服务@StreamListener抛出RunTimeException时无限次重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52042849/