go - 无法使用来自本地运行的 Kafka 服务器的消息,使用 Golang Sarama 包

标签 go apache-kafka telegram-bot sarama

我正在制作一个简单的 Telegram 机器人,它可以从本地 Kafka 服务器读取消息并将其打印到聊天中。 zookeeper 和 kafka 服务器配置文件都是默认值。控制台消费者作品。当我尝试使用 Golang Sarama 包从代码中获取消息时,问题就出现了。在我添加这些行之前:

case err := <-pc.Errors(): log.Panic(err)

程序只打印一次消息,之后就会停止。 现在它 panic 地将它打印到日志中: kafka: error while consuming test1/0: kafka: broker not connected

代码如下:

    type kafkaResponse struct {
        telega  *tgbotapi.Message
        message []byte
    }

    type kafkaRequest struct {
        telega *tgbotapi.Message
        topic  string
    }    
    var kafkaBrokers = []string{"localhost:9092"}
    func main() {
                //channels for request response
                var reqChan = make(chan kafkaRequest)
                var respChan = make(chan kafkaResponse)

                //starting kafka client routine to listen to topic channnel
                go consumer(reqChan, respChan, kafkaBrokers)

                //bot thingy here
                bot, err := tgbotapi.NewBotAPI(token)
                if err != nil {
                    log.Panic(err)
                }
                bot.Debug = true
                log.Printf("Authorized on account %s", bot.Self.UserName)
                u := tgbotapi.NewUpdate(0)
                u.Timeout = 60
                updates, err := bot.GetUpdatesChan(u)
                for {
                    select {
                    case update := <-updates:
                        if update.Message == nil {
                            continue
                        }
                        switch update.Message.Text {

                        case "Topic: test1":
                            topic := "test1"
                            reqChan <- kafkaRequest{update.Message, topic}
                        }
                    case response := <-respChan:
                        bot.Send(tgbotapi.NewMessage(response.telega.Chat.ID, string(response.message)))
                    }

                }

这是 consumer.go:

 func consumer(reqChan chan kafkaRequest, respChan chan kafkaResponse, brokers []string) {
            config := sarama.NewConfig()
            config.Consumer.Return.Errors = true

            // Create new consumer
            consumer, err := sarama.NewConsumer(brokers, config)
            if err != nil {
                panic(err)
            }
            defer func() {
                if err := consumer.Close(); err != nil {
                    panic(err)
                }
            }()

            select {
            case request := <-reqChan:
                //get all partitions on the given topic
                partitionList, err := consumer.Partitions(request.topic)
                if err != nil {
                    fmt.Println("Error retrieving partitionList ", err)
                }

                initialOffset := sarama.OffsetOldest
                for _, partition := range partitionList {
                    pc, _ := consumer.ConsumePartition(request.topic, partition, initialOffset)

                    go func(pc sarama.PartitionConsumer) {
                        for {
                            select {
                            case message := <-pc.Messages():
                                respChan <- kafkaResponse{request.telega, message.Value}
                            case err := <-pc.Errors():
                                log.Panic(err)
                            }
                        }
                    }(pc)
                }
            }
        }

最佳答案

在代码中设置所有 PartitionConsumer 之后,您将关闭您的消费者

defer func() {
            if err := consumer.Close(); err != nil {
                panic(err)
            }
        }()

但是,文档指定您只应在所有 PartitionConsumers 关闭后关闭消费者。

// Close shuts down the consumer. It must be called after all child
// PartitionConsumers have already been closed.
Close() error

我建议您将 sync.WaitGroup 添加到函数 go func(pc sarama.PartitionConsumer) {

关于go - 无法使用来自本地运行的 Kafka 服务器的消息,使用 Golang Sarama 包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47071314/

相关文章:

python - AIO卡夫卡 : Formerly working code now fails at send_and_wait

Avro 与 Protobuf 的性能指标

node.js - Telegram API 内联 request_contact

Go 锁定一片结构

go - 如何在 GO 中的 Conn 中编写响应(类型 Response)?

go - 避免为本地模块的递归依赖写入 "replace"

apache-kafka - Kafka在生产者与主题之间设置了压缩类型

javascript - Telegram Bot 付款 - 付款成功后显示收据

python-3.x - 我希望我的 Telegram Bot 等到用户使用远程机器人和回调处理程序应答

json - Go 嵌套 Json Marshall 或编码