apache-kafka - 带有 kafka-avro-console-consumer 的未知魔法字节

标签 apache-kafka avro confluent-platform confluent-schema-registry

我一直在尝试将 kafka-avro-console-consumer 从 Confluent 连接到我们遗留的 Kafka 集群,该集群是在没有 Confluent Schema Registry 的情况下部署的。
我使用以下属性显式提供了架构:

kafka-console-consumer --bootstrap-server kafka02.internal:9092 \
    --topic test \
    --from-beginning \
    --property key.schema='{"type":"long"}' \
    --property value.schema='{"type":"long"}'

但我收到了“未知的魔法字节!”错误 org.apache.kafka.common.errors.SerializationException
是否可以使用 Confluent kafka-avro-console-consumer 消费来自 Kafka 的 Avro 消息,这些消息未使用 Confluent 的 AvroSerializer 和 Schema Registry 序列化?

最佳答案

Confluent Schema Registry 序列化器/反序列化器使用 wire format它在消息的初始字节中包含有关架构 ID 等的信息。

如果您的消息没有使用 Schema Registry 序列化程序进行序列化,那么您将无法使用它反序列化它,并且会得到 Unknown magic byte!错误。

因此,您需要编写一个消费者来提取消息,使用您的 Avro avsc 模式进行反序列化,然后假设您想保留数据,使用 Schema Registry serializer 重新序列化它。

编辑:我最近写了一篇文章更深入地解释了整个事情:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

关于apache-kafka - 带有 kafka-avro-console-consumer 的未知魔法字节,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52399417/

相关文章:

offset - 卡夫卡消费者补偿最大值?

python - 从消息中心上的主题检索消息

apache-kafka - Kafka在多个租户之间平衡负载

hive - 如何将数据从kafka发送到hive

hadoop - 无法描述使用 avro serde 创建的 Hive 表

hadoop - Apache Pig : java. lang.OutOfMemoryError:Java 堆空间

java - 如何在 Avro 中定义 LogicalType。 ( java )

apache-kafka - Kafka Connect Confluence S3 Sink 连接器 : Class io. confluence.connect.avro.AvroConverter 找不到

postgresql - Kafka Connect - JSON 转换器 - JDBC 接收器连接器 - 列类型 JSON

ssl - 如何在关闭 TLS 对等验证的情况下使用 Kafka