go - 无法使用segmentio的kafka-go连接到Confluence Kafka

标签 go apache-kafka confluent-kafka-go

我可以使用confluence cli 连接到confluence kafka 集群,但无法使用segmentio 的kafka-go 库。 我收到以下错误。

with SASL: SASL handshake failed: EOF

这是我在 go 中的函数

package consumer

import (
    "context"
    "fmt"
    "log"
    "os"
    "time"

    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)
func Consume(ctx context.Context) {
    // create a new logger that outputs to stdout
    // and has the `kafka reader` prefix
    l := log.New(os.Stdout, "kafka reader: ", 0)
    mechanism := plain.Mechanism{
        Username: "my-api-key",
        Password: "my-api-secret",
    }

    dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
    }

    r := kafka.NewReader(kafka.ReaderConfig{
        Brokers: []string{brokerAddress}, // brokerAddress given in confluent cloud cluster settings. 
        Topic:   []string{"steps"}[0],
        // assign the logger to the reader
        Logger: l,
        Dialer: dialer,
    })
    for {
        // the `ReadMessage` method blocks until we receive the next event
        msg, err := r.ReadMessage(ctx)
        if err != nil {
            panic("could not read message " + err.Error())
        }
        // after receiving the message, log its value
        fmt.Println("received: ", string(msg.Value))
    }
}

我尝试生成新 key 、使用我的帐户用户名和密码、减少分区,但没有任何效果。

最佳答案

看来您的服务器的 TLS 版本不被接受,您可以使用 MinVersion 强制 go-kafka 接受它:

dialer := &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        SASLMechanism: mechanism,
        TLS: &tls.Config{
            MinVersion: tls.VersionTLS12,
        },
    }

关于go - 无法使用segmentio的kafka-go连接到Confluence Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/77357291/

相关文章:

go - 如何将我的自定义 UnmarshalJSON 方法应用于嵌入式结构?

go - 使用换行符解析字符串然后分配给变量

java - 如何在 Spring Cloud Stream 的事务上下文中使用 MessageChannel?

docker - 当我的 go 构建成功时,如何修复 docker 构建失败的问题? Dockerfile 包含 go mod 下载

go - 检查IP地址是否在私网空间

unit-testing - 在 golang 中重置 http 处理程序以进行单元测试

elasticsearch - 使用 Kafka 和 ELK 堆栈进行集中式日志记录

spring - 使用DeadLetterPublishingRecoverer处理Spring-kafka错误

go - Confluence kafka go 软件包与 ubuntu 22.04 兼容吗?