go - 在 go 中使用 Kafka Avro 消息

标签 go apache-kafka avro kafka-consumer-api

我正在尝试使用 avro 格式的 Kafka 消息,但我无法在 Go 中将消息从 avro 解码为 json。

我使用的是 Confluent 平台 (3.0.1)。例如,我生成如下 avro 消息:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

现在我使用 go Kafka 库:sarama 来消费消息。纯文本消息工作正常。 Avro 消息必须被解码。我发现了不同的库:github.com/linkedin/goavro,github.com/elodina/go-avro

但在解码后我得到一个没有值的 json(两个库):

{"f1":""}

果阿罗:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

去阿罗:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

msg = sarama.ConsumerMessage

最佳答案

The first byte is a magic byte (0). The following 4 bytes are the avro schema ID

这只有在您使用 Confluent 架构注册表时才真正有用。

关于go - 在 go 中使用 Kafka Avro 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40548909/

相关文章:

google-app-engine - 继续 Google App Engine 返回错误 : API error 1 (mail: INTERNAL_ERROR): Internal error

apache-kafka - 一个 Kafka 消费者如何从多个分区读取数据?

docker - kafka-zookeeper无法在docker-compose中连接

xml - 有没有办法将目录中的一些 XML 文件转换为 AVRO 文件?

hadoop - 具有纯文本输入和 avro 输出的 mapreduce 作业

go - reader.ReadLine() 在 Scanner.Scan() 调用后不会前进

go - 如何保证Golang channel 等待数据,Stdin没有数据时程序不终止

go - 为什么没有检测到 Go?

apache-spark - 在 kafka 流上使用 Spark 流作业

avro - Apache Avro 架构示例和文档