我们使用 Confluence Schema Registry 与 KafkaStreams 已经一年多了,一切都运行良好;直到昨天。
在 UAT 环境中,我们似乎删除了一个架构主题,并且我们的一个应用程序开始故障转移并显示消息
[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531
我检查了架构注册表,发现主题丢失了,并使用curl查询错误中列出的id 1531,例如:
curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531
回来了:
{"error_code":40403,"message":"Schema not found"}
我天真地尝试再次注册架构,没有考虑它,它起作用了,但注册架构的 ID 与之前的 1531 ID 不同。
我需要将架构注册到 ID 1531,因为主题中的现有消息已在魔术字节中包含该 ID 1531。
我检查了 API 文档 https://docs.confluent.io/current/schema-registry/docs/develop/api.html但没有看到任何为架构设置给定 ID 的内容。
有没有办法通过架构注册表将架构强制指定为特定 ID?
我知道一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失或采取特殊措施来修复主题数据。
最佳答案
Is there anyway to force a schema to a specific Id with schema registry?
没有。
<小时/>1531 的 ID 实际上并没有“消失”,顺便说一下,它只是在注册表中标记为已删除(使用 _schemas
主题查看它)。
据我所知,当您使用 KafkaAvroDeserializer 时,确实无法解决该错误。您必须使用 ByteArrayDeserializer,然后使用架构注册表客户端“修复”或“查找”正确的 ID,然后反序列化消息的其余部分。
另一个选项是重置您的消费者组,以便完全跳过这些消息,或者设置异常处理。 Handling bad messages using Kafka's Streams API
关于java - 使用特定 ID 将架构添加到架构注册表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55282944/