我想使用 Kafka 高级消费者 API,同时我想禁用偏移量的自动提交。我试图通过以下步骤实现这一目标。
1) auto.commit.enable = false
2) offsets.storage = kafka
3) dual.commit.enabled = false
我创建了一个偏移量管理器,它定期向 kafka 创建 offsetcommit 请求并提交偏移量。
还有以下问题
1) 高级消费者 API 是否自动从 kafka 存储中获取偏移量并使用该偏移量初始化自身?或者我应该使用简单的消费者 API 来实现这个目标吗?
2) 基于 kafka 的抵消存储是否在所有代理中重复?还是仅在一个经纪人上维护?
最佳答案
I created a offset manager, which periodically creates offsetcommit request to kafka and commits the offset.
如果您使用的是高级消费者,它为您提供了手动提交偏移量的方法,则不需要这样做,javadoc (在手动偏移控制下)为您提供了有关如何执行此操作的示例。
1) Does high level consumer API automatically fetches offset from kafka storage and initializes itself with that offset? Or should I use simple consumer API to achieve this?
高级消费者将在您重新启动时负责获取最后提交的偏移量,因此您可以从中断的地方继续消费。
2) Does kafka based storage for offsets is repicated across all brokers? Or it is maintained on only one broker?
Kafka 将消费者偏移量存储在名为 __consumer_offsets
的内部主题中,默认情况下其复制因子设置为 3,有 50 个分区。所以它被复制到 3 个经纪人。您可以在 broker config 中找到有关其配置的更多信息,它们以 offset
或 offsets
开头。
关于java - Kafka 使用高级消费者 API 抵消提交请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29909179/