java - 如何进行最高效的 Kafka 健康检查控制?

标签 java spring apache-kafka health-monitoring

我使用spring kafka。我想确定 Kafka 已关闭。 kafka有这样的实现吗?我找到了各种解决方案。但我发现的所有解决方案都必须编写计划作业。有没有一种方法可以显示kafka已关闭而无需编写计划作业?

我能找到的解决方案:

由于

中公开的原因,此解决方案已恢复

https://github.com/spring-projects/spring-boot/issues/12225

https://github.com/spring-projects/spring-boot/issues/12222

https://github.com/spring-projects/spring-boot/pull/12200

所以,我不想使用这个解决方案。

@Configuration
public class KafkaConfig {

@Autowired
private KafkaAdmin admin;

@Autowired
private MeterRegistry meterRegistry;

@Autowired
private Map<String, KafkaTemplate<?, ?>> kafkaTemplates;

@Bean
public AdminClient kafkaAdminClient() {
    return AdminClient.create(admin.getConfig());
}

@SuppressWarnings("deprecation") // Can be avoided by relying on Double.NaN for non doubles.
@PostConstruct
private void initMetrics() {
    final String kafkaPrefix = "kafka.";
    for (Entry<String, KafkaTemplate<?, ?>> templateEntry : kafkaTemplates.entrySet()) {
        final String name = templateEntry.getKey();
        final KafkaTemplate<?, ?> kafkaTemplate = templateEntry.getValue();
        for (Metric metric : kafkaTemplate.metrics().values()) {
            final MetricName metricName = metric.metricName();
            final Builder<Metric> gaugeBuilder = Gauge
                    .builder(kafkaPrefix + metricName.name(), metric, Metric::value) // <-- Here
                    .description(metricName.description());
            for (Entry<String, String> tagEntry : metricName.tags().entrySet()) {
                gaugeBuilder.tag(kafkaPrefix + tagEntry.getKey(), tagEntry.getValue());
            }
            gaugeBuilder.tag("bean", name);
            gaugeBuilder.register(meterRegistry);
        }
    }
}

@Bean
public HealthIndicator kafkaHealthIndicator() {
    final DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(1000);
    final AdminClient adminClient = kafkaAdminClient();
    return () -> {
        final DescribeClusterResult describeCluster = adminClient.describeCluster(describeClusterOptions);
        try {
            final String clusterId = describeCluster.clusterId().get();
            final int nodeCount = describeCluster.nodes().get().size();
            return Health.up()
                    .withDetail("clusterId", clusterId)
                    .withDetail("nodeCount", nodeCount)
                    .build();
        } catch (InterruptedException | ExecutionException e) {
            return Health.down()
                    .withException(e)
                    .build();
        }
    };

}
}

我发现的另一个解决方案:

public class KafkaHealthIndicator implements HealthIndicator {
private final Logger log = LoggerFactory.getLogger(KafkaHealthIndicator.class);

private KafkaTemplate<String, String> kafka;

public KafkaHealthIndicator(KafkaTemplate<String, String> kafka) {
    this.kafka = kafka;
}

/**
 * Return an indication of health.
 *
 * @return the health for
 */
@Override
public Health health() {
    try {
        kafka.send("kafka-health-indicator", "❥").get(100, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        return Health.down(e).build();
    }
    return Health.up().build();
}
}

在两个解决方案中,我必须使用计划作业。有没有一种方法可以显示kafka已关闭而无需编写预定作业?如果没有可用的方法,第二个解决方案足以显示kafka已关闭?

最佳答案

据我所知,所有 Spring 健康检查都是计划检查。

我信任您在其他项目中放入的第一个代码块,因为它依赖于健康的 Kafka Controller 来返回 Activity 代理列表。我建议的一项补充是在类中添加一个字段,用于表示您希望成为 nodeCount

一部分的代理的最小数量。

关于java - 如何进行最高效的 Kafka 健康检查控制?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57294453/

相关文章:

java while 循环没有按预期工作

java - Spring mvc,制作菜单数据对象

java - Spring Security "No bean named ' filterChainProxy'可用“错误

java - Spring Kafka 以 TCP 端口作为生产者

Spring Cloud Stream Kafka Streams Binder KafkaException : Could not start stream: 'listener' cannot be null

java - % 在 url http 请求 java 中

java - 如何在 jenkins 上访问 com.sun.management.ThreadMXBean 的 ThreadMx,junit 测试

java - 递归未导航所有子节点

security - Spark 卡夫卡 安全 kerberos

java - 卡夫卡点对点