go - 如何通过 Kafka 生产者发送 Protobuf 消息

标签 go protocol-buffers kafka-producer-api

我正在使用 Sarama 库通过生产者发送消息。 这允许我发送字符串。我的目标是发送 Protobuf 消息

msg := &sarama.ProducerMessage{
            Topic: *topic,
            Value: sarama.StringEncoder(content),
        }

这是我的示例原型(prototype)类

message Pixel {

    // Session identifier stuff
    int64 timestamp    = 1; // Milliseconds from the epoch
    string session_id  = 2; // Unique Identifier... for parent level0top
    string client_name = 3; // Client-name/I-key

    string ip = 10;
    repeated string ip_list = 11;
    string datacenter = 12;
    string proxy_type = 13;

能否提供一个示例,说明如何发送 protobuf 消息。

最佳答案

您需要使用 proto#Marshalsarama#ByteEncoder在生产者方面和proto#Unmarshal在消费者方面。


制作人:

            pixelToSend := &pixel.Pixel{SessionId: t.String()}
            pixelToSendBytes, err := proto.Marshal(pixelToSend)
            if err != nil {
                log.Fatalln("Failed to marshal pixel:", err)
            }

            msg := &sarama.ProducerMessage{
                Topic: topic,
                Value: sarama.ByteEncoder(pixelToSendBytes),
            }

消费者:

        receivedPixel := &pixel.Pixel{}
        err := proto.Unmarshal(msg.Value, receivedPixel)
        if err != nil {
            log.Fatalln("Failed to unmarshal pixel:", err)
        }

        log.Printf("Pixel received: %s", receivedPixel)

完整示例:

package main

import (
    pixel "example/pixel"
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/Shopify/sarama"
    "github.com/golang/protobuf/proto"
)

func main() {
    topic := "your-topic-name"
    brokerList := []string{"localhost:29092"}

    producer, err := newSyncProducer(brokerList)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:", err)
    }

    go func() {
        ticker := time.NewTicker(time.Second)
        for {
            select {
            case t := <-ticker.C:
                pixelToSend := &pixel.Pixel{SessionId: t.String()}
                pixelToSendBytes, err := proto.Marshal(pixelToSend)
                if err != nil {
                    log.Fatalln("Failed to marshal pixel:", err)
                }

                msg := &sarama.ProducerMessage{
                    Topic: topic,
                    Value: sarama.ByteEncoder(pixelToSendBytes),
                }

                producer.SendMessage(msg)
                log.Printf("Pixel sent: %s", pixelToSend)
            }
        }

    }()

    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)

    partitionConsumer, err := newPartitionConsumer(brokerList, topic)
    if err != nil {
        log.Fatalln("Failed to create Sarama partition consumer:", err)
    }

    log.Println("Waiting for messages...")

    for {
        select {
        case msg := <-partitionConsumer.Messages():
            receivedPixel := &pixel.Pixel{}
            err := proto.Unmarshal(msg.Value, receivedPixel)
            if err != nil {
                log.Fatalln("Failed to unmarshal pixel:", err)
            }

            log.Printf("Pixel received: %s", receivedPixel)
        case <-signals:
            log.Print("Received termination signal. Exiting.")
            return
        }
    }
}

func newSyncProducer(brokerList []string) (sarama.SyncProducer, error) {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    // TODO configure producer

    producer, err := sarama.NewSyncProducer(brokerList, config)
    if err != nil {
        return nil, err
    }

    return producer, nil
}

func newPartitionConsumer(brokerList []string, topic string) (sarama.PartitionConsumer, error) {
    conf := sarama.NewConfig()
    // TODO configure consumer
    consumer, err := sarama.NewConsumer(brokerList, conf)
    if err != nil {
        return nil, err
    }

    partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetOldest)
    if err != nil {
        return nil, err
    }

    return partitionConsumer, err
}

关于go - 如何通过 Kafka 生产者发送 Protobuf 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58842792/

相关文章:

java - Google Protocol Buffer 适用于 Windows,但不适用于 Linux

python - 在不创建描述符的情况下解析 .proto 文件

django - Kafka Python 生产者与 django Web 应用程序集成

apache-kafka - 生产者和消费者是否需要指定partition

go - 在 Go 中以编程方式运行测试

使用 Glide - vendor 不工作

go - 使用 Go 使用 http/2 连接到 Alexa 语音服务时遇到问题

http - Go http 客户端超时与上下文超时

c# - Protocol Buffer ,让 C# 与 C++ 对话 : type issues and schema issues

java - 卡夫卡 : Alter number of partitions for a specific topic using java