go - 如何在 Golang Kafka 10 中获取 GroupID?

标签 go apache-kafka sarama

我正在使用 Kafka 10.0 和 https://github.com/Shopify/sarama . 我正在尝试获取消费者处理的最新消息的偏移量。

为此,我找到了方法 NewOffsetManagerFromClient(group string, client Client)需要组名。

如何获取消费者组名称?

offsets := make(map[int32]int64)

config := sarama.NewConfig()
config.Consumer.Offsets.CommitInterval = 200 * time.Millisecond
config.Version = sarama.V0_10_0_0

// config.Consumer.Offsets.Initial = sarama.OffsetNewest
cli, _ := sarama.NewClient(kafkaHost, config)
defer cli.Close()

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)
for _, partition := range partitions {
    partitionOffsetManager, _ := offsetManager.ManagePartition(topic, partition)
    offset, _ := partitionOffsetManager.NextOffset()

    offsets[partition] = offset
}
return offsets

我创建了一个消费者

consumer := sarama.NewConsumer(connections, config)

但我不知道如何创建消费者组并获取其组名。

最佳答案

您正在尝试创建自己的偏移量管理器以查找当前偏移量:

offsetManager, _ := sarama.NewOffsetManagerFromClient(group, cli)

类似地,消费您的主题消息的消费者必须使用相同的偏移量管理器,并且他们将使用特定的组 ID。使用该组 ID。

关于go - 如何在 Golang Kafka 10 中获取 GroupID?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41984572/

相关文章:

go - Kafka 0.11/Golang Sarama 版本支持

inheritance - 通过 Go 中的接口(interface)修改结构成员

database - 我是否需要对存储在数据库中的刷新 token 进行哈希处理?

Gorm 计数预加载字段

docker - 许可证主题的 Kafka Connect 复制因子

go - 在 sarama-cluster 中模拟 NewConsumer

function - 初始化函数字段

ruby-on-rails - 如何将Kafka客户端添加到Rails Docker部署中?

java - ZooKeeper 启动成功但不工作

go - 如何设置消费者从Golang Kafka 10中的特定偏移量开始