go - 从 Goroutine 重新连接到 kafka

标签 go apache-kafka concurrency

我想使用 Golang 向 kafka 写一条消息。我有以下代码。

package kafkaK

import (
    "context"
    "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/snappy"
    "time"
)

var writer *kafka.Writer

func Configure() (w *kafka.Writer, err error) {
    dialer := &kafka.Dialer{
        Timeout:  10 * time.Second,
        ClientID: "123",
    }

    config := kafka.WriterConfig{
        Brokers:          []string{"localhost:9092"},
        Topic:            "test",
        Balancer:         &kafka.LeastBytes{},
        Dialer:           dialer,
        WriteTimeout:     10 * time.Second,
        ReadTimeout:      10 * time.Second,
        CompressionCodec: snappy.NewCompressionCodec(),
    }
    w = kafka.NewWriter(config)
    writer = w
    return w, nil
}

func Push(parent context.Context, key, value []byte) (err error) {
    message := kafka.Message{
        Key:   key,
        Value: value,
        Time:  time.Now(),
    }
    return writer.WriteMessages(parent, message)
}

我在单独的 Goroutines 中向 kafka 写信。

func sendMessage(message string) {
    err := kafkaK.Push(context.Background(), nil, []byte(message))
    if err != nil {
        fmt.Println(err)
    }
}

调用看起来像

go sendMessage("message #" + strconv.Itoa(i))
  1. 碰巧kafka不可用,我想重新连接它。如果不同的 Goroutines 使用同一个 kafka 对象,并且只有它们知道发生了错误并可以发起重新连接,如何正确地做到这一点。也许还有其他方法?
  2. 哪个库更适合与 kafka 一起使用?

我认为可以使用 channel 或上下文,但我是新手,所以我不太明白如何实现它

最佳答案

首先我想指出您在 kafka-go 中使用了已弃用的接口(interface)。 This文档指定您可以使用 kafkago.Writer{} 提供连接和写入 kafka 集群的配置。现在回答您的问题:

It happens that kafka is unavailable, and I would like to reconnect to it. How to do it correctly if different Goroutines use the same kafka object, and only they know that an error has occurred and can initiate reconnection. Perhaps there is some other approach?

使用像 kafka-go 这样的高级库,您不必担心处理连接或暂时性错误,例如 kafka 集群暂时不可用。当您在 kafka writer 上使用 WriteMessages 方法时,它会缓冲您的消息,并在使用您提供的配置连接到代理后定期刷新。因此,只要配置正确,kafka-go 就会为您处理剩下的事情。

Which library is better to use for working with kafka?

kafka-go 绝对广泛 used并积极维护对 kafka 的读写。

另外,回到你的例子,这就是我重构它的方式:

package kafkak

import (
    "context"
    "fmt"
    "github.com/segmentio/kafka-go"
    "sync"
    "time"
)

// Define an interface so you can mock it for testing.
type kproducer interface {
    WriteMessages(ctx context.Context, msgs ...kafka.Message) error
}

type KafkaK struct {
    p      kproducer
    dataCh chan kafka.Message // Write messages to this channel so a separate go routine can process them
    errCh  chan error // Fetch errors from this channel
    stopCh chan struct{}
}

func NewKafkaK(p kproducer) *KafkaK {
    k := &KafkaK{
        p:      p,
        dataCh: make(chan kafka.Message),
        errCh:  make(chan error),
        stopCh: make(chan struct{}),
    }

    go k.processLoop()

    return k
}

func (k *KafkaK) Push(key, value string) {
    m := kafka.Message{
        Key:   []byte(key),
        Value: []byte(value),
        Time:  time.Now(),
    }

    k.dataCh <- m
}

func (k *KafkaK) processLoop() {
    for {
        select {
        case msg := <-k.dataCh:
            ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second*5))
            err := k.p.WriteMessages(ctx, msg)
            k.errCh <- err
        case <-k.stopCh:
            break
        }
    }
}

func (k *KafkaK) Stop() {
    close(k.stopCh)
    close(k.errCh)
}

func (k *KafkaK) Errs() <-chan error {
    return k.errCh
}

关于go - 从 Goroutine 重新连接到 kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76167316/

相关文章:

go - 尝试从Redis中的列表进行LPOP时类型错误的操作

java - 来自单个主主题的多个流

Go Golang - 嵌入类型和 "len/range"

go - 无法使用segmentio的kafka-go连接到Confluence Kafka

Python Kafka消费者读取已读消息

java - 将字符串数组发送到 Kafka 主题

multithreading - ConcurrentBag<T> 中的 Parallel.ForEach 是线程安全的

java - 是否有旨在调试并发软件的 JVM?

sql - Derby中带有WHERE子句的SQL UPDATE语句的原子性

function - 检查 Go 中首先完成的任务是什么?