只能使用 Confluent Kafka Connect 将简单对象插入到数据库中。不知道如何让这个支持复杂的 json/schema 结构。我不确定此功能是否可用。有一个类似的问题here大约一年前问过,但直到现在还没有回答。请帮忙。
最佳答案
Kafka Connect 确实支持复杂的结构,包括 Struct
, Map
, 和 Array
.通常只有源连接器需要这样做,因为接收器连接器传递值并且只需要使用它们。 This documentation描述了建立 Schema
的基础知识描述 Struct
的对象,然后创建一个 Struct
遵守该模式的实例。在这种情况下,示例结构只是一个平面结构。
但是,您可以轻松添加 Struct
类型的字段。用另一个 Schema
定义的实例。实际上,它只是将这个简单的模式分层到结构中的多个级别:
Schema addressSchema = SchemaBuilder.struct().name(ADDRESS)
.field("number", Schema.INT16_SCHEMA)
.field("street", Schema.STRING_SCHEMA)
.field("city", Schema.STRING_SCHEMA)
.build();
Schema personSchema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.field("address", addressSchema)
.build();
Struct addressStruct = new Struct(addressSchema)
.put("number", 100)
.put("street", "Main Street")
.put("city", "Springfield")
.build();
Struct personStruct = new Struct(personSchema)
.put("name", "Barbara Liskov")
.put("age", 75)
.put("address", addressStruct)
.build();
因为
SchemaBuilder
是一个流畅的 API,你实际上可以像自定义的一样嵌入它 admin
bool 模式构建器。但这有点难,因为您需要引用 Schema
创建 addressStruct
.通常,您只需在编写源连接器时担心如何执行此操作。如果您尝试使用现有的源连接器,您可能几乎无法控制键和值的结构。例如,Confluent's JDBC source connector使用单独的
Schema
为每个表建模并将该表中的每一行作为单独的 Struct
使用该架构。但由于行是平的,Schema
和 Struct
将只包含具有原始类型的字段。Debezium's用于 MySQL 的 CDC 连接器和 PostgreSQL还使用
Schema
为关系表建模并对应 Struct
对象,但 CDC 会捕获有关该行的更多信息,例如更改之前和/或之后该行的状态。因此,这些连接器使用 a more complex Schema
对于每个涉及嵌套的表 Struct
对象。请注意,虽然每个源连接器都有自己的消息结构风格,但 Kafka Connect 的 Single Message Transforms (SMTs)使源连接器生成的消息在写入 Kafka 之前或从 Kafka 读取的消息发送到接收器连接器之前非常容易地过滤、重命名和稍加修改。
关于apache-kafka - 汇流的 Kafka Connect 支持复杂或嵌套的 json/schema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44649913/