java - Kafka 使用高级消费者 API 抵消提交请求

标签 java apache-kafka

我想使用 Kafka 高级消费者 API,同时我想禁用偏移量的自动提交。我试图通过以下步骤实现这一目标。

1) auto.commit.enable = false
2) offsets.storage = kafka
3) dual.commit.enabled = false

我创建了一个偏移量管理器,它定期向 kafka 创建 offsetcommit 请求并提交偏移量。

还有以下问题

1) 高级消费者 API 是否自动从 kafka 存储中获取偏移量并使用该偏移量初始化自身?或者我应该使用简单的消费者 API 来实现这个目标吗?

2) 基于 kafka 的抵消存储是否在所有代理中重复?还是仅在一个经纪人上维护?

最佳答案

I created a offset manager, which periodically creates offsetcommit request to kafka and commits the offset.

如果您使用的是高级消费者,它为您提供了手动提交偏移量的方法,则不需要这样做,javadoc (在手动偏移控制下)为您提供了有关如何执行此操作的示例。

1) Does high level consumer API automatically fetches offset from kafka storage and initializes itself with that offset? Or should I use simple consumer API to achieve this?

高级消费者将在您重新启动时负责获取最后提交的偏移量,因此您可以从中断的地方继续消费。

2) Does kafka based storage for offsets is repicated across all brokers? Or it is maintained on only one broker?

Kafka 将消费者偏移量存储在名为 __consumer_offsets 的内部主题中,默认情况下其复制因子设置为 3,有 50 个分区。所以它被复制到 3 个经纪人。您可以在 broker config 中找到有关其配置的更多信息,它们以 offsetoffsets 开头。

关于java - Kafka 使用高级消费者 API 抵消提交请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29909179/

相关文章:

java - 卡夫卡流: how to convert data types?

java - 同一 IntelliJ 项目中的 Kafka 消费者和生产者

gradle - Scaladocs构建失败kafka

apache-kafka - Kafka JSON 控制台生产者

java - Java 初学者编程 - 在对话框 (JOptionPane) 框中使用另一种方法的整数

java - 针对不同tomcat war应用的多个logback.xml配置文件

java - XML 解析 - 搜索特定元素

Java:如何使用 Vertx 按顺序运行连续的异步调用?

java - 使用子字符串进行随机化

Python:模拟 Kafka 进行集成测试