我创建了一个实现 org.apache.kafka.common.metrics.KafkaMetric
的类像这样:
public class DatadogMetricTracker implements MetricsReporter {
@Override
public void configure(Map<String, ?> configs) {
System.out.println(configs);
}
@Override
public void init(List<KafkaMetric> metrics) {
System.out.println(metrics);
}
@Override
public void metricChange(KafkaMetric metric) {
System.out.println(metric.metricName().name() + ": " + metric.value() + " tags: " + metric.metricName().tags());
}
@Override
public void metricRemoval(KafkaMetric metric) {
}
@Override
public void close() {
}
}
然后,当我设置 Kafka props 时,我将该类注册为指标报告者:
properties.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "com.myco.utils.DatadogMetricTracker");
当我启动我的消费者时,configure
接到电话并 init
,然后metricChange
使用一批值全部为 0 或 -Infinity 的指标调用一次,然后永远不会再次调用它。如何让我的指标记录器再次启动?
谢谢!
最佳答案
检查了代码,除了最初注册指标之外,没有调用 metricChange 方法。所以最好使用一些替代方法。截至目前,我将实现定期读取指标并发布它们,因为在我们的例子中,普罗米修斯将从我们的应用程序中抓取指标。更重要的一点是,由于值已经在内部计算,因此将仅使用量规来填充 prometheus 的值
它适用于 JMX 报告器,因为它们创建动态 bean,并且当检查 Mbean 的属性值时(通过 JConsole 或其他东西),然后调用获取指标值的实际方法,因此它可以在那里工作,代码如下:
public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
if (this.metrics.containsKey(name))
return this.metrics.get(name).metricValue();
else
throw new AttributeNotFoundException("Could not find attribute " + name);
}
关于java - Kafka Consumer Custom MetricReporter 未接收指标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52212173/