java - 使用特定 ID 将架构添加到架构注册表

标签 java apache-kafka confluent-schema-registry

我们使用 Confluence Schema Registry 与 KafkaStreams 已经一年多了,一切都运行良好;直到昨天。

在 UAT 环境中,我们似乎删除了一个架构主题,并且我们的一个应用程序开始故障转移并显示消息

[ERROR] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_13, topic: TOPIC_NAME, partition: 13, offset: 0 org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1531

我检查了架构注册表,发现主题丢失了,并使用curl查询错误中列出的id 1531,例如:

curl -X GET http://SchemaRegistryHost:8081/schemas/ids/1531

回来了:

{"error_code":40403,"message":"Schema not found"}

我天真地尝试再次注册架构,没有考虑它,它起作用了,但注册架构的 ID 与之前的 1531 ID 不同。

我需要将架构注册到 ID 1531,因为主题中的现有消息已在魔术字节中包含该 ID 1531。

我检查了 API 文档 https://docs.confluent.io/current/schema-registry/docs/develop/api.html但没有看到任何为架构设置给定 ID 的内容。

有没有办法通过架构注册表将架构强制指定为特定 ID?

我知道一些备份解决方案,但我现在正在寻找一种修复方法,希望能够防止数据丢失或采取特殊措施来修复主题数据。

最佳答案

Is there anyway to force a schema to a specific Id with schema registry?

没有。

<小时/>

1531 的 ID 实际上并没有“消失”,顺便说一下,它只是在注册表中标记为已删除(使用 _schemas 主题查看它)。

<小时/>

据我所知,当您使用 KafkaAvroDeserializer 时,确实无法解决该错误。您必须使用 ByteArrayDeserializer,然后使用架构注册表客户端“修复”或“查找”正确的 ID,然后反序列化消息的其余部分。

另一个选项是重置您的消费者组,以便完全跳过这些消息,或者设置异常处理。 Handling bad messages using Kafka's Streams API

关于java - 使用特定 ID 将架构添加到架构注册表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55282944/

相关文章:

apache-spark - 使用 Kafka 和 Spark Streaming 为 Web 应用程序提供服务

java - 在 Spring Cloud Stream 中使用 PollableMessageSource 输入时如何使用 avro native 解码器?

java - 使用 KafkaListener 从 Kafka 消费一次最少 N 条消息

apache-kafka - 单个主题内的 Kafka 安全性 : allowing only certain users to read certain messages,

java - 使用java代码读取rtmp直播视频数据

java - 通过二维数组中随机生成的数字打印字符数

java - 我如何告诉 KafkaAvroSerializer 与架构注册表的连接必须通过代理?

python - 将包添加到 memgraph 转换

java - 调整垃圾回收以实现低延迟

java - 如何通过环境变量设置 Java 的最小和最大堆大小?