java - Spring Boot Kafka 客户端是否有 "Circuit Breaker"?

标签 java spring-boot apache-kafka-streams spring-kafka circuit-breaker

如果 Kafka 服务器(暂时)关闭,我的 Spring Boot 应用程序 ReactiveKafkaConsumerTemplate 会不断尝试连接失败,从而导致不必要的流量并弄乱日志文件:

2021-11-10 14:45:30.265  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845  WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected

是否可以使用诸如断路器之类的东西(灵感 herehere ),以便 Spring Boot Kafka 客户端出现故障 (或者甚至更好的是连续几次失败)减慢连接尝试的速度,并且仅在服务器再次启动后才恢复到正常速度?

是否已经有现成的配置参数,或任何其他解决方案?

我知道 parameter reconnect.backoff.ms,这就是我创建 ReactiveKafkaConsumerTemplate bean 的方式:

@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
    final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
    map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
    map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
    final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("com.example.myapplication");

    return new ReactiveKafkaConsumerTemplate<>(
            ReceiverOptions
                    .<String, MyEvent>create(map)
                    .withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
                    .withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
                    .subscription(List.of("MyTopic")));
}

消费者仍然每 3 秒尝试连接一次。

最佳答案

参见https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.

https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms

The maximum amount of time in milliseconds to wait when reconnecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms.

关于java - Spring Boot Kafka 客户端是否有 "Circuit Breaker"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69914621/

相关文章:

spring-boot - com.microsoft.sqlserver.jdbc.sqlserver异常 : the stream is closed

java - 在 spring boot 中包装时,语义 UI React 图标显示为空矩形

apache-kafka - 如何真正丢弃迟到的记录?

java - Kafka Streams 应用程序死亡,错误代码为 "StreamsException: Could not create internal topics."

java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed

java - 在 Java 中转换为时间戳

java - 如何在Java中通过格式化来居中字符串?

java - 使用加密货币进行加密。加密失败,解密正常

java - 查询包含 java.sql.Date 列的表时,Jpa findAll() 会导致错误

java - 调试 AsyncTask - 奇怪的行为,调试跳转到代码中