spring-boot - Spring Kafka - 手动确认

标签 spring-boot apache-kafka spring-kafka

我有一个 spring-boot 应用程序,它监听 Kafka 流并将记录发送到某个服务以进行进一步处理。该服务有时可能会失败。注释中提到了异常情况。目前我已经自己模拟了服务成功和异常的场景。

监听器代码:

@Autowired
PlanitService service

@KafkaListener(
        topics = "${app.topic}",
        groupId = "notifGrp", 
        containerFactory = "storeKafkaListener")
public void processStoreNotify(StoreNotify store) throws RefrigAlarmNotifyException{
       service.planitStoreNotification(store);

       // Some other logic which throws custom exception
       // RefrigAlarmNotifyException


    }
}

消费者工厂配置如下:

@Bean
    public ConsumerFactory<String, StoreNotify> storeConsumerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumerBootstrapServers());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "notifGrp");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        try (ErrorHandlingDeserializer2<String> headerErrorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
                new StringDeserializer());
                ErrorHandlingDeserializer2<StoreNotify> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
                        new JsonDeserializer<>(StoreNotify.class, objectMapper()))) {
            return new DefaultKafkaConsumerFactory<>(config, headerErrorHandlingDeserializer,
                    errorHandlingDeserializer);
        }
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StoreNotify> storeKafkaListener() {
        ConcurrentKafkaListenerContainerFactory<String, StoreNotify> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(storeConsumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        //factory.setMessageConverter(new ByteArrayJsonMessageConverter());     

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
                (r, e) -> {

                    LOGGER.error("Exception is of type: ", e);
                    if (e instanceof RestClientException) {
                        LOGGER.error("RestClientException while processing {} ", r.value(), e);
                        return new TopicPartition(storeDeadLtrTopic, r.partition());
                    }
                    else {
                        LOGGER.error("Generic exception while processing {} ", r.value(), e);
                        return new TopicPartition(storeErrorTopic, r.partition());
                    }
                });
        factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 0L)));
        return factory;
    }

由于 REST 服务抛出 RestClientException,因此它应该进入上面提到的 if block 。关于FixedBackOff,我不希望SeekToCurrentErrorHandler进行重试处理,因此我将第二个参数传递为0l。我只是想让它发送指定主题的记录。如果我错了请纠正我 异常堆栈跟踪是

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(com.demo.ran.model.StoreNotify) throws com.demo.ran.exception.RefrigAlarmNotifyException' threw exception; nested exception is org.springframework.web.client.RestClientException: Service exception; nested exception is org.springframework.web.client.RestClientException: Service exception
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1742) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1730) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_241]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_241]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]
Caused by: org.springframework.web.client.RestClientException: Service exception
    at com.demo.ran.service.PlanitService.planitStoreNotification(PlanitService.java:53) ~[classes/:na]
    at com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(StoreKafkaConsumer.java:48) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    ... 8 common frames omitted

最佳答案

对于此用例,您不需要使用手动确认;只需配置一个 SeekToCurrentErrorHandler 并将异常抛出到容器即可;它将丢弃未处理的记录,执行查找并重新传递失败的消息。

参见the documentation .

您可以使用 DeadLetterPublishingRecoverer 配置错误处理程序,该错误处理程序可用于在重试一定次数后将记录发送到死信主题。

您可以配置哪些异常是可重试的。

        } catch (Exception exception) {
            LOGGER.error("Exception while calling the service  ", exception);
            // Ignore the record
        }

你不能像那样“吃掉”异常,让它传播到容器。

当使用手动确认时,您必须添加Acknowledgment作为参数并确认它。

关于spring-boot - Spring Kafka - 手动确认,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60730281/

相关文章:

java - 如何在Spring中添加/设置查询参数?

hibernate - 参数值 [....] 与预期类型不匹配 [java.util.Collection (n/a)]

apache-kafka - 如何在具有不同配置的 Kafka 主题之间分发消息?

python - PyKafka producer.get_delivery_report 在 block=false 时抛出 Queue.empty

java - Spring Cloud Stream 确定主题 Kafka 消息来自何处

java - 如何在 Spring Boot 中重载 KafkaListener 方法

spring-boot - 带有Spring-data-elastic的Spring Boot连接到AWS服务器上的Elastic Search 7.4.0

scala - Spark Dataframe 以 avro 格式写入 kafka 主题?

spring - SeekToCurrentBatchErrorHandler 对死 DLQ 的支持

java - 如何在 Spring Security 中创建自定义身份验证过滤器?