spring-boot - 当所有代理都关闭时 Spring-Kafka Producer 重试

标签 spring-boot spring-kafka

我正在使用 Springboot 2.3.5.RELEASE 和 spring-kafka 2.6.3。我正在尝试做一个简单的 Kafka Producer Retry POC,这应该会导致生产者在代理关闭时或者在消息发送到代理之前抛出异常时重试。
以下生产者配置适用于启用重试的幂等生产者。

// Producer configuration
@Bean
public Map<String, Object> producerConfigs() {
   Map<String, Object> props = new HashMap<>();
   props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
   props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
   props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
   props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "3");
   props.put(ProducerConfig.RETRIES_CONFIG, "10");
   props.put(ProducerConfig.ACKS_CONFIG, "all");
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
... other configs

我知道只有在代理向生产者发送确认时出现暂时性错误时,Kafka 生产者重试才会起作用。因此,如果代理在发送消息之前就已关闭,则上述重试不起作用。 因此我尝试用 spring-kafka 引入@Retry,不幸的是我也无法让它工作。有谁知道如何解决这个问题,因为这似乎是一个常见的用例,比如如果经纪人宕机或者出现网络故障,我不希望我的制作人停止工作

@Transaction
@Retryable(value = Exception.class, maxAttemptsExpression = "10",
        backoff = @Backoff(delayExpression = "1000"))
public void sendTestMessage(String name, String number) throws RuntimeException {
    TestObject testObj = new TestObject(name + ": " + String.valueOf(number),
            Double.valueOf(Math.random() * 10000).longValue(),
            "Blah" + String.valueOf(Math.random() * 10));
    log.info("Sending sendTestMessage {}", name); 
    throwSomeException();
    // sends the message
    kafkaTemplate.send(MyConstants.SomeTOPIC, testObj);     
}  
// Throwing mock exception
public void throwSomeException() throws RuntimeException {
        log.info("Throwing mock exception {}");
        throw new RuntimeException("Mock Exception thrown while sending the 
        sendTestMessage method");
}   
@Recover
public void recover(Exception e, TestObject testObj) { 
    log.info("Recovered exception thrown  mock exception {} with payload as {}", e, testObj);
}

是否有重试可以解决上述问题。我很难相信默认情况下没有内置这样的功能。

最佳答案

我解决了这个问题,我发布这个是为了其他人的利益。 我的配置类中缺少以下内容

@Configuration
@EnableRetry <-- this was missing
public class ProducerConfig {
  ...

添加上述内容后,它工作正常。

关于spring-boot - 当所有代理都关闭时 Spring-Kafka Producer 重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65247844/

相关文章:

spring-boot 默认的 EntityManager

spring-boot - java.lang.RuntimeException:使用本地的Spring Boot容器api时连接被拒绝

java - Spring Kafka 并发与 spring-integration

java - 无法从 kafka 主题轮询/获取所有记录

spring-boot - 如何使用 Spring Kafka 测试 Kafka Streams 应用程序?

java - 如何一起使用 mapstruct 和 springboot bean? @autowired

java - 在抽象基类中使用 @autowired

java - 如何将其他属性引用到 log4j2 yaml 配置中?

java - 设置并发> 1后spring kafka thorws InstanceAlreadyExistsException异常

docker - SSL.keystore.location 在我的 Kubernetes secret 挂载中找不到 JKS 文件