apache-kafka - 汇流的 Kafka Connect 支持复杂或嵌套的 json/schema

标签 apache-kafka apache-kafka-connect

只能使用 Confluent Kafka Connect 将简单对象插入到数据库中。不知道如何让这个支持复杂的 json/schema 结构。我不确定此功能是否可用。有一个类似的问题here大约一年前问过,但直到现在还没有回答。请帮忙。

最佳答案

Kafka Connect 确实支持复杂的结构,包括 Struct , Map , 和 Array .通常只有源连接器需要这样做,因为接收器连接器传递值并且只需要使用它们。 This documentation描述了建立 Schema 的基础知识描述 Struct 的对象,然后创建一个 Struct遵守该模式的实例。在这种情况下,示例结构只是一个平面结构。

但是,您可以轻松添加 Struct 类型的字段。用另一个 Schema 定义的实例。实际上,它只是将这个简单的模式分层到结构中的多个级别:

Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
    .field("number", Schema.INT16_SCHEMA)
    .field("street", Schema.STRING_SCHEMA)
    .field("city", Schema.STRING_SCHEMA)
    .build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT8_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .field("address", addressSchema)
    .build();

Struct addressStruct = new Struct(addressSchema)
    .put("number", 100)
    .put("street", "Main Street")
    .put("city", "Springfield")
    .build();
Struct personStruct = new Struct(personSchema)
    .put("name", "Barbara Liskov")
    .put("age", 75)
    .put("address", addressStruct)
    .build();

因为SchemaBuilder是一个流畅的 API,你实际上可以像自定义的一样嵌入它 admin bool 模式构建器。但这有点难,因为您需要引用 Schema创建 addressStruct .

通常,您只需在编写源连接器时担心如何执行此操作。如果您尝试使用现有的源连接器,您可能几乎无法控制键和值的结构。例如,Confluent's JDBC source connector使用单独的 Schema 为每个表建模并将该表中的每一行作为单独的 Struct使用该架构。但由于行是平的,SchemaStruct将只包含具有原始类型的字段。

Debezium's用于 MySQL 的 CDC 连接器和 PostgreSQL还使用 Schema 为关系表建模并对应 Struct对象,但 CDC 会捕获有关该行的更多信息,例如更改之前和/或之后该行的状态。因此,这些连接器使用 a more complex Schema 对于每个涉及嵌套的表 Struct对象。

请注意,虽然每个源连接器都有自己的消息结构风格,但 Kafka Connect 的 Single Message Transforms (SMTs)使源连接器生成的消息在写入 Kafka 之前或从 Kafka 读取的消息发送到接收器连接器之前非常容易地过滤、重命名和稍加修改。

关于apache-kafka - 汇流的 Kafka Connect 支持复杂或嵌套的 json/schema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44649913/

相关文章:

logging - 如何使用log4j在日志中打印spring kafka配置

apache-kafka - 具有 SASL 安全性的 Zookeeper 和 Kafka

elasticsearch - 使用 Kafka-Connect 将消息路由到不同的 ElasticSearchIndex

apache-kafka - 连接器中的任务何时变为未分配状态?

java - ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套

go - Uber-go/zap 和 kafka-go 比赛条件

java - spring kafka 寻找主题中最新的可用消息

apache-kafka - 使用 Debezium 提取 key 的转换中不存在字段

mysql - Kafka JDBC Source Connector 没有使用 sql 查询从 mysql 读取数据?

apache-kafka - Zookeeper 错误 : dataDir is not set