kotlin - 注册 Avro 架构 : "string" RestClientException: Schema being registered is incompatible with an earlier schema; 时出错

标签 kotlin apache-kafka avro confluent-schema-registry

我正在尝试使用 Avro 架构向我的经纪人发送消息,但“我总是收到错误消息:

2020-02-01 11:24:37.189 [nioEventLoopGroup-4-1] ERROR Application - Unhandled: POST - /api/orchestration/ org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409



这是我的码头容器:
 connect:
    image: confluentinc/cp-kafka-connect:5.4.0
    hostname: confluentinc-connect
    container_name: confluentinc-connect
    depends_on:
      - zookeeper
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: confluentinc-connect
      CONNECT_CONFIG_STORAGE_TOPIC: confluentinc-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: confluentinc-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: confluentinc-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "true"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/extras"

我的制作人(用科林写的)
 val prop: HashMap<String, Any> = HashMap()
    prop[BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
    prop[KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java.name
    prop[VALUE_SERIALIZER_CLASS_CONFIG] = KafkaAvroSerializer::class.java.name
    prop[SCHEMA_REGISTRY_URL] = schemaUrl
    prop[ENABLE_IDEMPOTENCE_CONFIG] = idempotence
    prop[ACKS_CONFIG] = acks.value
    prop[RETRIES_CONFIG] = retries
    prop[MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION] = requestPerConnection
    prop[COMPRESSION_TYPE_CONFIG] = compression.value
    prop[LINGER_MS_CONFIG] = linger
    prop[BATCH_SIZE_CONFIG] = batchSize.value

    return KafkaProducer(prop)

我的 Avro 架构:
{
    "type": "record",
    "namespace": "com.rjdesenvolvimento",
    "name": "create_client_value",
    "doc": "Avro Schema for Kafka Command",
    "fields": [
        {
            "name": "id",
            "type": "string",
            "logicalType": "uuid",
            "doc": "UUID for indentifaction command"
        },
        {
            "name": "status",
            "type": {
                "name": "status",
                "type": "enum",
                "symbols": [
                    "Open",
                    "Closed",
                    "Processing"
                ],
                "doc": "Can be only: Open, Closed or Processing"
            },
            "doc": "Status of the command"
        },
        {
            "name": "message",
            "type": {
                "type": "record",
                "name": "message",
                "doc": "Avro Schema for insert new client",
                "fields": [
                    {
                        "name": "id",
                        "type": "string",
                        "logicalType": "uuid",
                        "doc": "UUID for indentifaction client transaction"
                    },
                    {
                        "name": "active",
                        "type": "boolean",
                        "doc": "Soft delete for client"
                    },
                    {
                        "name": "name",
                        "type": "string",
                        "doc": "Name of the client"
                    },
                    {
                        "name": "email",
                        "type": "string",
                        "doc": "Email of the client"
                    },
                    {
                        "name": "document",
                        "type": "string",
                        "doc": "CPF or CPNJ of the client"
                    },
                    {
                        "name": "phones",
                        "doc": "A list of phone numbers",
                        "type": {
                            "type": "array",
                            "items": {
                                "name": "phones",
                                "type": "record",
                                "fields": [
                                    {
                                        "name": "id",
                                        "type": "string",
                                        "logicalType": "uuid",
                                        "doc": "UUID for indentifaction of phone transaction"
                                    },
                                    {
                                        "name": "active",
                                        "type": "boolean",
                                        "doc": "Soft delete for phone number"
                                    },
                                    {
                                        "name": "number",
                                        "type": "string",
                                        "doc": "The phone number with this regex +xx xx xxxx xxxx"
                                    }
                                ]
                            }
                        }
                    },
                    {
                        "name": "address",
                        "type": "string",
                        "logicalType": "uuid",
                        "doc": "Adrres is an UUID for a other address-microservice"
                    }
                ]
            }
        }
    ]
}

还有我的帖子:
{       
      "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
        "status" : "Open",
        "message": {
            "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
            "active" : true,
             "name": "name",
             "email": "email@com",
             "document": "document",
             "phones": [
                 {
                     "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
                        "active" : true,
                     "number": "+xx xx xxxx xxxx"
                 },
                    {
                     "id" : "9ec818da-6ee0-4634-9ed8-c085248cae12",
                        "active" : true,
                     "number": "+xx xx xxxx xxxx"
                 }
             ],
             "address": "9ec818da-6ee0-4634-9ed8-c085248cae12"  
        }   
}

我究竟做错了什么?
github项目:https://github.com/rodrigodevelms/kafka-registry

更新 =====

简要地:
我没有使用 Gradle Avro 插件生成我的类。
在这个例子中,我的 POST 发送了一个 Client 对象。在服务中,它组装了一个命令类型的对象,如下所示:

id:相同的客户端ID

状态:开放

消息:发送的 POST。

所以我把它发送到 KAFKA,在连接(jdbc sink postgres)中,我只将消息(客户端)的属性作为 fields.whitelist 放入,我没有得到命令 ID 或状态。
on github the only classes that matter to understand the code are:

1 -https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/producer/Producer.kt

2 - https://github.com/rodrigodevelms/kafka-registry/blob/master/kafka/src/main/kotlin/com/rjdesenvolvimento/messagebroker/commnad/Command.kt

3 - https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Controller.kt

4 -https://github.com/rodrigodevelms/kafka-registry/blob/master/src/client/Service.kt

5 - docker-compose.yml, insert-client-value.avsc, postgresql.json,



如果我将 avro 方案的兼容模式设置为“无”,我可以发送消息,但会显示一些未知字符,如下图所示。

enter image description here

最佳答案

我怀疑您正在尝试做多件事,并且在之前的尝试之后您没有清理状态。您不应该在全新安装中遇到该错误

Schema being registered is incompatible with an earlier schema



您的数据发生了变化,注册表中的架构与您发送的架构不兼容。

您可以向 http://registry:8081/subjects/[name]/ 发送 HTTP DELETE 请求删除架构的所有版本,然后您可以重新启动连接器

关于kotlin - 注册 Avro 架构 : "string" RestClientException: Schema being registered is incompatible with an earlier schema; 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60018375/

相关文章:

android - 从列表中设置ContentView (Kotlin)

java - 如何在 kafka 中初始化 kafka ConsumerRecords<String,String> 进行测试

hadoop - Flume HDFS 接收器未从 Kafka channel 在 hdfs 中创建文件

java - 解析文本行并添加到 MySQL

android - Kotlin 创建一个 snackbar

arrays - 数组作为 Kotlin 中的通用类型

apache-kafka - 在 Windows 中启动 Confluent Schema Registry

apache-kafka - Kafka AVRO - 从长到日期时间的转换

android - 如何修复无法在没有参数的情况下调用私有(private)的问题

apache-kafka - **Kafka** 跨区域数据中心之间的双向同步