Spring Kafka - 如何使用 @KafkaListener 重试

标签 spring spring-boot spring-kafka

来自 Twitter 的问题:

只是想找到一个使用 spring-kafka 2.1.7 的简单示例,该示例与 KafkaListener 和 AckMode.MANUAL_IMMEDIATE 一起使用,以重试上次失败的消息。

https://twitter.com/tolbier/status/1028936942447149056

最佳答案

通常最好在 Stack Overflow 上提出此类问题(标记为

有两种方法:

  • RetryTemplate 添加到监听器容器工厂 - 重试将在内存中执行,您可以设置退避属性。
  • 添加一个SeekToCurrentErrorHandler,它将重新查找未处理的记录。

这是一个例子:

@SpringBootApplication
public class Twitter1Application {

    public static void main(String[] args) {
        SpringApplication.run(Twitter1Application.class, args);
    }

    boolean fail = true;

    @KafkaListener(id = "foo", topics = "twitter1")
    public void listen(String in, Acknowledgment ack) {
        System.out.println(in);
        if (fail) {
            fail = false;
            throw new RuntimeException("failed");
        }
        ack.acknowledge();
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
        // or factory.setRetryTemplate(aRetryTemplate);
        // and factory.setRecoveryCallback(aRecoveryCallback);
        return factory;
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            Thread.sleep(2000);
            template.send("twitter1", "foo");
            template.send("twitter1", "bar");
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("twitter1", 1, (short) 1);
    }

}

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

spring.kafka.listener.ack-mode=manual-immediate

logging.level.org.springframework.kafka=debug

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>twitter1</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>twitter1</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

(Boot 2.0.4 引入 2.1.8,这是当前版本)。

foo
2018-08-13 17:36:14.901 ERROR 3945 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is  ...    

2018-08-13 17:36:15.396 DEBUG 3945 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Received: 2 records
foo
2018-08-13 17:36:15.398 DEBUG 3945 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {twitter1-0=OffsetAndMetadata{offset=5, metadata=''}}
bar
2018-08-13 17:36:15.403 DEBUG 3945 --- [      foo-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {twitter1-0=OffsetAndMetadata{offset=6, metadata=''}}

在即将发布的 2.2 版本中,可以使用恢复程序配置错误处理程序,并提供标准恢复程序以将失败记录发布到死信主题。

Commit hereDocs Here .

关于Spring Kafka - 如何使用 @KafkaListener 重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51831034/

相关文章:

java - Spring + Hibernate 初始化错误 : Cannot find class [<providername>]

maven - Spring Boot 忽略主类

java - 使用 DeadLetterPublishingRecoverer 和手动 ack 模式的 Spring Kafka 监听器

java - Springboot的@Value注解找不到Kafka Producer认证的JKS文件位置

apache-kafka - Spring Kafka lib 和 native Kafka Java API 之间的区别

java - 使用 Java 8 即时请求参数的 Spring MVC GET 请求

java - TensorFlow Lite 在机器学习模型处理数值数据方面有用吗?

spring - 找不到元素 'context:component-scan' 的声明

java - 使用 .p12 文件执行对 rest 服务器的请求

java - Spring Batch - 分块和多线程步骤 - RowMapper 中的 Nullpointer 异常