java - 如何监控 Kafka 消息的应用程序处理以进行负载测试

标签 java apache-zookeeper apache-kafka

有一个应用程序(不是我的)从 Kafka 读取消息,对其进行一些处理,并将记录存储在数据库中。我用 Java 编写了一个程序,它以给定的速率将消息写入队列。现在,它通过在测试运行结束时查询数据库来进行简单的性能测量,以确保记录输入 = 记录输出。但是,我想扩展它以定期检查队列以查看应用程序尚未处理的待处理消息数,以查看它是否得到备份。

我想我可以在 Zookeeper 中检查应用程序组 ID 的偏移量。我看着 Kafka documentation ,但它只提供了基本的消费者示例,而且 API 文档充其量也很少,所以我不确定如何找到这些信息。

我需要调用哪些 API 才能找出应用程序当前在队列中的位置,以及该位置后面的队列中有多少消息?

我将 Kafka 2.10-0.8.2.1 与单个 Zookeeper 实例和三个 Kafka 实例一起使用,负载测试器使用 0.8.2.1 Java API。所讨论的主题有三个分区(每个 Kafka 服务器一个),但是为了测试的目的,只有一个消费者。

最佳答案

我建议查看 Kafka 中已经提供的工具(如果需要直接调用 API,代码在 src 中可用)。特别是,

$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group consumer-group1 --zkconnect zkhost:zkport --topic topic1

将显示偏移和滞后:

consumer-group1,topic1,0-0 (Group,Topic,BrokerId-PartitionId)
Owner = consumer-group1-consumer1
Consumer offset = 70121994703
= 70,121,994,703 (65.31G)
Log size = 70122018287
= 70,122,018,287 (65.31G)
Consumer lag = 23584
= 23,584 (0.00G)

引用资料:

关于java - 如何监控 Kafka 消息的应用程序处理以进行负载测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31588389/

相关文章:

apache-kafka - Kafka Zookeeper-Java.net.BindException : Address already in use

java - Spring Boot kafka中基于消费者记录key的多个消费者

azure - 用于事件中心的 kafkacat

java - 在 Java 中创建的半透明或透明图像在 Android OpenGL ES 中不显示

docker - Docker 中的 Kafka 日志目录

java - 如何保持 Android 应用程序始终运行(Service 和 BroadcastReceiver)?

solr - gzip 压缩在 Solr 5.1 中不起作用

apache-kafka - 卡夫卡 : how to remove broker from Replica Set

java - 如何获取批量操作中插入/删除/更新失败的文档的数组 _ids?

java - java中try catch语句中的返回