go - 如何设置消费者从Golang Kafka 10中的特定偏移量开始

标签 go apache-kafka sarama

我的需要是让生产者从崩溃前处理的最后一条消息开始。幸运的是,我遇到的是只有一个主题、一个分区和一个消费者的情况。

为此,我尝试了 https://github.com/Shopify/sarama但它似乎还没有。 我现在正在使用 https://godoc.org/github.com/bsm/sarama-cluster ,这允许我提交每个消息偏移量。

我无法检索最后提交的偏移量 我不知道如何制作 sarama consumer从所述偏移量开始。到目前为止我发现的唯一参数是 Config.Producer.Offsets.Initial

  1. 如何检索最后提交的偏移量?
  2. 如何让消费者从最后一个offset已经提交的消息开始? OffsetNewest 将使它从最后生成的消息开始,而不是消费者最后处理的消息。
  3. 是否可以只使用 Shopify/sarama 而不是 bsm/sarama-cluster 来做到这一点?

提前致谢

附言我使用的是 Kafka 10.0,所以偏移量存储在 kafka 中,而不是在 zookeeper 中。

编辑 1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到我们找到未处理的消息。

最佳答案

如果已经为分区保存了偏移量,sarama-cluster 将从该偏移量恢复消费。 Config.Producer.Offsets.Initial 选项仅在不存在已保存的偏移量时使用(首次为消费者组运行)。

您可以通过在 main() 函数的开头添加以下行来验证这一点:

sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)

然后你会在输出中看到如下内容:

集群/消费者 CID-17db1be4-a162-411c-a106-4d198191176a 从 12 消费 sample/0

其中的 12 是 Sarama 将从该分区 (sample/0) 开始的偏移量。

关于go - 如何设置消费者从Golang Kafka 10中的特定偏移量开始,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41983936/

相关文章:

java - kafka_2.11-0.8.2.1 消费者再平衡

go - 使用Sarama单独或批量提交消息-Go的kafka客户端

go - Golang Consumer连接Kafka后延迟接收Kafka消息

go - 打印时如何取消引用字段?

docker - 转到 Docker 容器失败 : "Exit Code 1"

.net - Kafka (.NET) 中的消息头

go - 重消费Kafka消息的可能原因

go - 如何在 Amazon Linux 中更新 Go 应用程序

Golang 前置数组

python - LEADER_NOT_AVAILABLE - docker 容器