go - 消息不以魔术字节开头

标签 go apache-kafka avro confluent-schema-registry sarama

我正在尝试使用 将 avro 编码数据生成到 kafka 主题中/linkedin/goavro 在 Go 中打包。目标是能够使用不同的客户端来使用主题。

首先,我将架构注册如下:

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "{\"name\":\"test_topic2\",\"type\":\"record\", \"fields\":[{\"name\":\"user\",\"type\":\"string\"},{\"name\":\"password\",\"size\":10,\"type\":\"string\"}]}"}' http://localhost:8081/subjects/test_topic2-value/versions

然后我创建 avro 数据,使用 Go 生成和使用它。

package main

import (

    "github.com/Shopify/sarama"
    "github.com/linkedin/goavro"
    "fmt"

)
const (
    brokers = "localhost:9092"
    topic     = "test_topic2"
)

const loginEventAvroSchema = `{"name":"test_topic2","type":"record", "fields":[{"name":"user","type":"string"},{"name":"password","size":10,"type":"string"}]}`

func main() {

// Create Message

codec, err := goavro.NewCodec(loginEventAvroSchema)
if err != nil {
    panic(err)
}

m := map[string]interface{}{
    "user": "pikachu", "password": 231231,
}

single, err := codec.SingleFromNative(nil, m)
if err != nil {
    panic(err)
}


// Producer
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true
    config.Producer.Return.Successes = true

    config.Version = sarama.V2_4_0_0
    //get broker
    cluster, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := cluster.Close(); err != nil {
            panic(err)
        }
    }()

    msg := &sarama.ProducerMessage{
        Topic: topic,
        Value: sarama.StringEncoder(single),
    }

    cluster.SendMessage(msg)

// Consumer 

    clusterConsumer, err := sarama.NewConsumer(brokers, config)
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := clusterConsumer.Close(); err != nil {
            panic(err)
        }
    }()

    msgK, _ := clusterConsumer.ConsumePartition(topic, 0, sarama.OffsetOldest)

    for {

        q := <-msgK.Messages()

        native, _, err := codec.NativeFromSingle([]byte(q.Value))
        if err != nil {
            fmt.Println(err)
        }

        fmt.Println(native)

}

此代码工作正常,我可以成功地在 kafka 主题中生成和使用消息。

现在我尝试使用来自 python avro-consumer 的主题:

from confluent_kafka import KafkaError
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


c = AvroConsumer({
    'bootstrap.servers': 'localhost',
    'group.id': 'groupid',
    'schema.registry.url': 'http://localhost:8081',
    'auto.offset.reset': 'earliest'})


c.subscribe(['test_topic2'])

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value(), msg.key())

c.close()

但我收到以下错误:
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at test_topic2 [0] offset 1: message does not start with magic byte

我认为我在 Go 生产者部分遗漏了一些东西,如果有人能分享他/她关于如何解决这个问题的经验,我将不胜感激。

最佳答案

goavro不使用模式注册表。

另外,您正在使用 StringEncoder ,我假设它只输出一个字符串 slice 而不是 Avro 字节

StringEncoder implements the Encoder interface for Go strings so that they can be used as the Key or Value in a ProducerMessage.



FWIW,我建议用 kafka-avro-console-consumer 测试消费者,如果你有的话

关于go - 消息不以魔术字节开头,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60599237/

相关文章:

http - Golang重写http请求体

apache-kafka - Kafka Streams reduceByKey 与 leftJoin

java - 解码原始字节数组后如何更改特定字段值?

java - 如何在 avro 模式中创建包含字符串数组的对象?

go - 如何在很短的间隔内/同时进行多个查询

go - 为什么编译器在这个实例中提示一个未使用的变量(当它被 fmt.Fprintf 使用时)?

node.js - Strimzi - 连接外部客户端

elasticsearch - 卡夫卡 : client has run out of available brokers to talk to

C# .net 核心 Confluent.Kafka Avro 反序列化

go - 星号在 "Go"中有什么作用?