go - Sarama 无法生成 Amazon MSK 版本 2.3.1 的消息

标签 go sarama aws-msk amazon-msk

我正在使用sarama golang library用于将消息推送至 Amazon MSK 。到目前为止,我使用的 msk 版本 2.2.1 我的代码工作正常,但现在 msk 版本已更改为 2.3.1。现在,我无法将消息推送到主题。

Error:

Partition -1

Offset -1

Request was for a topic or partition that does not exist on this broker.

代码:

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

func initKafkaEventClient() (err error) {
      config := sarama.NewConfig()
      config.Net.TLS.Enable = false
      config.Producer.Return.Successes = true
      config.Version = sarama.V0_10_0_0

      brokers := strings.Split(kafkaEventHost, ",") //split the host into brokers

      kafkaEventClient, err = sarama.NewClient(brokers, config)
      if err != nil {
         log.Println("initKafkaClient: failed to create new kafka client", err)
         return
      }
}

func PushMessageToKafka(message string) {
    client, err := getKafkaEventClient()
    if err != nil {
        return
    }

    producer, err := sarama.NewSyncProducerFromClient(kafkaEventClient)
    if err != nil {
    fmt.Println("PushMessageToKafka: failed to get producer", err)
    return
    }
    var msg sarama.ProducerMessage
    msg.Topic = "some_topic"
    msg.Value = sarama.StringEncoder("some_message")
    p, o, err := producer.SendMessage(&msg)

    fmt.Println("Partition", p)
    fmt.Println("Offset", o)

    if err != nil {
        fmt.Println("PushMessageToKafka: failed to push message to be displayed", err)
     }
}

我已将 sarama 版本也更改为 maxVersion config.Version = sarama.MaxVersion,但它不适用于 Amazon MSK 2.3.1。

请提供一些解决方案。

最佳答案

经过多次调试,找到了解决方案。 不是版本问题,实际上是返回客户端的代码问题

func getKafkaEventClient() (sarama.Client, error) {

    if !setupDone {
        return nil, errors.New("Invalid setup")
    }

    if kafkaEventClient != nil {
        return kafkaEventClient, nil
    }

    err := initKafkaEventClient()
    if err != nil {
        return nil, err
    }

    return kafkaEventClient, nil
}

这里if kafkaEventClient != nil则返回之前的客户端,这是错误的。对于每个客户端,如果代理/主机发生变化,那么我们必须创建一个新客户端,并且该客户端将能够找到我们想要推送消息的主题。如果我们获取旧客户端并将消息推送到存在于不同代理/主机中的主题,那么我们将收到如上所述的错误。

Error:

Partition -1

Offset -1

Request was for a topic or partition that does not exist on this broker.

我希望它能解决面临同样问题的人的问题。

关于go - Sarama 无法生成 Amazon MSK 版本 2.3.1 的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60275011/

相关文章:

c - 如何使用 "foreign function interface"从 Go 调用 C

go - 调用 tcp : lookup ip-x-x-xx. ec2.internal: 没有这样的主机

amazon-web-services - 如何在 AWS MSK 集群上将 auto.create.topics.enable 设置为默认配置

apache-kafka-connect - 有没有办法使用 MSK-connect API 更新连接器配置?

amazon-web-services - 使用 IAM 的 AWS MSK Spring Boot 应用程序示例

go - 除了 Gosched 还有什么?

go - 是否可以将 ZMQ 代理用作 "switch"?

go - 如何将嵌套结构中的字段设置为零值?

go - 如何从开始到特定偏移量消耗消息

json - 将 JSON 对象数组解析为单个文档