spring - 使用DeadLetterPublishingRecoverer处理Spring-kafka错误

标签 spring spring-boot error-handling apache-kafka spring-kafka

我正在尝试在Spring Boot Kafa中实现错误处理。在我的Kafka监听器中,我抛出了如下的运行时异常:

@KafkaListener(topics= "Kafka-springboot-example", groupId="group-employee-json")
    public void consumeEmployeeJson(Employee employee) {
        logger.info("Consumed Employee JSON: "+ employee);

        if(null==employee.getEmployeeId()) {
            throw new RuntimeException("failed");
            //throw new ListenerExecutionFailedException("failed");
        }
    }

而且我已经按照如下配置了错误处理:
@Configuration
@EnableKafka
public class KafkaConfiguration {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory,
            KafkaTemplate<Object, Object> template){

        ConcurrentKafkaListenerContainerFactory<Object, Object> factory= new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(
                new DeadLetterPublishingRecoverer(template)));

        return factory;

    }
}

我的DLT监听器如下所示:
@KafkaListener(topics= "Kafka-springboot-example.DLT", groupId="group-employee-json")
    public void consumeEmployeeErrorJson(Employee employee) {
        logger.info("Consumed Employee JSON frpm DLT topic: "+ employee);
    }

但是我的消息没有发布到DLT主题。

知道我在做什么错吗?

编辑:

application.properties
server.port=8088

#kafka-producer-config
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer


#Kafka consumer properties
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=group-employee-json
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*

最佳答案

public ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory(
如果将非标准bean名称用于容器工厂,则需要在@KafkaListener属性的containerFactory上进行设置。

缺省的bean名称是kafkaListenerContainerFactory,它由Boot自动配置。您需要覆盖该bean或将侦听器配置为指向您的非标准bean名称。

关于spring - 使用DeadLetterPublishingRecoverer处理Spring-kafka错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61049654/

相关文章:

java - 如何在 spring security 中检查每个请求的用户状态

java - 找不到 Spring 类(class)

php - 隐藏 “Creating default object from empty value in”输出

java - 从控制台禁用/更改 Spring Boot 的 ApplicationContext 的时间戳

actionscript-3 - AS3-addChild错误#1009-IOErrorEvent.IO_ERROR

python - 发生错误时运行函数而不尝试

带有可分页的 Spring 自定义查询

Java日历,将任务设置为重复

java - 如何将带有 '/' 符号的值作为获取参数传递给我的 Controller ?

java - 如何刷新/更新@Autowired EurekaClient