go - 我如何确保我的消费者按顺序处理 kafka 主题中的消息,并且只处理一次?

标签 go apache-kafka confluent-platform

我以前从未使用过kafka。我有两个访问本地 kafka 实例的测试 Go 程序:一个读取器和一个写入器。我正在尝试调整我的生产者、消费者和 kafka 服务器设置以获得特定行为。

我的作家:

package main

import (
    "fmt"
    "math/rand"
    "strconv"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    rand.Seed(time.Now().UnixNano())

    topics := []string{
        "policymanager-100",
        "policymanager-200",
        "policymanager-300",
    }
    progress := make(map[string]int)
    for _, t := range topics {
        progress[t] = 0
    }

    producer, err := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "0",
    })
    if err != nil {
        panic(err)
    }
    defer producer.Close()

    fmt.Println("producing messages...")
    for i := 0; i < 30; i++ {
        index := rand.Intn(len(topics))
        topic := topics[index]
        num := progress[topic]
        num++
        fmt.Printf("%s => %d\n", topic, num)
        msg := &kafka.Message{
            Value: []byte(strconv.Itoa(num)),
            TopicPartition: kafka.TopicPartition{
                Topic: &topic,
            },
        }
        err = producer.Produce(msg, nil)
        if err != nil {
            panic(err)
        }
        progress[topic] = num
        time.Sleep(time.Millisecond * 100)
    }
    fmt.Println("DONE")
}

我本地kafka上存在三个topic:policymanager-100、policymanager-200、policymanager-300。它们每个只有 1 个分区,以确保所有消息都按 kafka 接收它们的时间排序。我的作者将随机选择其中一个主题并发布一条消息,其中包含一个仅针对该主题递增的数字。当它完成运行时,我希望队列看起来像这样(为了易读性缩短了主题名称):

100: 1 2 3 4 5 6 7 8 9 10 11
200: 1 2 3 4 5 6 7
300: 1 2 3 4 5 6 7 8 9 10 11 12

到目前为止一切顺利。我正在尝试配置一些东西,以便可以启动任意数量的消费者并按顺序使用这些消息。 “按顺序”是指在消息 1 完成(而不是刚刚开始)之前,任何消费者都不应收到主题 100 的消息 2。如果正在处理主题 100 的消息 1,则消费者可以自由使用当前没有正在处理消息的其他主题。如果某个主题的消息已发送给消费者,则整个主题应“锁定”,直到超时假设消费者失败或消费者提交消息,然后主题被“解锁”以发出下一条消息可供消费。

我的读者:

package main

import (
    "fmt"
    "time"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
    count := 2
    for i := 0; i < count; i++ {
        go consumer(i + 1)
    }
    fmt.Println("cosuming...")
    // hold this thread open indefinitely
    select {}
}

func consumer(id int) {
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers":  "localhost",
        "group.id":           "0", // strconv.Itoa(id),
        "enable.auto.commit": "false",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{`^policymanager-.+$`}, nil)
    for {
        msg, err := c.ReadMessage(-1)
        if err != nil {
            panic(err)
        }

        fmt.Printf("%d) Message on %s: %s\n", id, msg.TopicPartition, string(msg.Value))
        time.Sleep(time.Second)
        _, err = c.CommitMessage(msg)
        if err != nil {
            fmt.Printf("ERROR commiting: %+v\n", err)
        }
    }
}

根据我目前的理解,我可能实现这一目标的方法是正确设置我的消费者。我已经尝试过这个程序的许多不同变体。我试过让我所有的 goroutines 共享同一个消费者。我试过为每个 goroutine 使用不同的 group.id。这些都不是获得我所追求的行为的正确配置。

发布的代码所做的是一次清空一个主题。尽管有多个 goroutine,该过程将读取全部 100,然后移动到 200,然后移动到 300,实际上只有一个 goroutine 会完成所有读取。当我让每个 goroutine 有不同的 group.id 时,消息会被多个 goroutine 读取,我想阻止这种情况。

我的示例消费者只是简单地用 goroutines 分解,但是当我开始将这个项目用于我的工作用例时,我需要它来跨多个不会相互通信的 kubernetes 实例工作,所以使用任何东西一旦 2 个 kube 上有 2 个实例,goroutine 之间的交互就不会起作用。这就是为什么我希望让 kafka 做我想要的看门人。

最佳答案

一般来说,你不能。即使您有一个消费者消费了该主题的所有分区,这些分区也会以不确定的顺序消费,并且无法保证您在所有分区中的总排序。

试试 Keyed Messages,认为您可能会发现它对您的用例很有用。

关于go - 我如何确保我的消费者按顺序处理 kafka 主题中的消息,并且只处理一次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54336650/

相关文章:

postgresql - Debezium Kafka 连接。十进制模式错误

sql-server - Golang 连接到 SQL Server 错误 - "TLS Handshake failed: Cannot read handshake packet: EOF"

json - 将变量类型的 json 转换为字符串

go - 为什么这个版本标签会导致错误?

node.js - 如何在Nodejs中检查Kafka主题是否存在

java - 卡夫卡经纪人以随机间隔崩溃

java - Kafka KStream - 显着的启动延迟

go - 如何在Go中从公用文件夹发回图像?

hadoop - 合流 HDFS 连接器 : How can I read from the latest offset when there are no hdfs files?

docker - Kubernetes 上的 Kafka 图表 : Simple test string produce + consume