java - 如何获取用于日志记录的kafka consumer-id

标签 java apache-kafka spring-kafka

在我的应用程序中,我使用 spring-kafka 来消费来自 kafka 服务器的消息,但是我从控制台消费者那里得到了所有活跃的消费者线程的 consumer-id

TOPIC            PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                    HOST            CLIENT-ID
easytest-events    9          247367          247367          0             p3-S14-0-e6a1d3cb-8ab3-435f-9f53-5081a6e8f812 /10.66.56.129   p3-S14-0

有没有办法通过代码获取consumer-id,以便我可以比较它们

最佳答案

consumer-id 似乎是附加了 UUID 的 client-id - 因此您可以只使用 client-id(您可以将其设置为任何您想要的)。 Spring 会添加-0、-1 等。

您可以在分配分区时在日志中看到线程数...

2018-08-31 09:34:27.869  INFO 55748 --- [o52105744-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-0]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-3]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-2]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-9-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-1]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-3-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-4]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-6-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-7]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-5-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-6]
2018-08-31 09:34:27.876  INFO 55748 --- [o52105744-4-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-5]
2018-08-31 09:34:27.877  INFO 55748 --- [o52105744-7-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-8]
2018-08-31 09:34:27.877  INFO 55748 --- [o52105744-8-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [so52105744-9]

关于java - 如何获取用于日志记录的kafka consumer-id,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52105744/

相关文章:

scala - 控制结构化 Spark Streaming 的微批处理

java - Springboot的@Value注解找不到Kafka Producer认证的JKS文件位置

java - 如何在尝试处理文件之前验证文件是否已完成写入?

java - Hibernate 按条件映射的一对多映射

java - Android - 添加按钮后应用程序不断崩溃

java - 从 java 的 main 中调用不同的方法并且错误 : cannot find symbol

python - 从 kafka 消费者读取时,如何从用于 avro 记录的架构注册表中查找架构 ID

java - 在单元测试中与 KafkaEmbedded 一起使用时,@DirtiesContext 的行为是什么?

spring-kafka - Spring Kafka Stop 容器出现异常

Spring Kafka JsonDesirialization MessageConversionException 无法解析类名 未找到类