来自 Twitter 的问题:
只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使用,以重试上次失败的消息。
最佳答案
通常最好在 Stack Overflow 上提出此类问题(标记为 spring-kafka 。
有两种方法:
- 将
RetryTemplate
添加到监听器容器工厂 - 重试将在内存中执行,您可以设置退避属性。 - 添加一个
SeekToCurrentErrorHandler
,它将重新查找未处理的记录。
这是一个例子:
@SpringBootApplication
public class Twitter1Application {
public static void main(String[] args) {
SpringApplication.run(Twitter1Application.class, args);
}
boolean fail = true;
@KafkaListener(id = "foo", topics = "twitter1")
public void listen(String in, Acknowledgment ack) {
System.out.println(in);
if (fail) {
fail = false;
throw new RuntimeException("failed");
}
ack.acknowledge();
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
// or factory.setRetryTemplate(aRetryTemplate);
// and factory.setRecoveryCallback(aRecoveryCallback);
return factory;
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Thread.sleep(2000);
template.send("twitter1", "foo");
template.send("twitter1", "bar");
};
}
@Bean
public NewTopic topic() {
return new NewTopic("twitter1", 1, (short) 1);
}
}
和
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.listener.ack-mode=manual-immediate
logging.level.org.springframework.kafka=debug
和
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>twitter1</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>twitter1</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
(Boot 2.0.4 引入 2.1.8,这是当前版本)。
和
foo
2018-08-13 17:36:14.901 ERROR 3945 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is ...
2018-08-13 17:36:15.396 DEBUG 3945 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Received: 2 records
foo
2018-08-13 17:36:15.398 DEBUG 3945 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {twitter1-0=OffsetAndMetadata{offset=5, metadata=''}}
bar
2018-08-13 17:36:15.403 DEBUG 3945 --- [ foo-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {twitter1-0=OffsetAndMetadata{offset=6, metadata=''}}
在即将发布的 2.2 版本中,可以使用恢复程序配置错误处理程序,并提供标准恢复程序以将失败记录发布到死信主题。
Commit here 。 Docs Here .
关于Spring Kafka - 如何使用 @KafkaListener 重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51831034/