apache-kafka - 使用 Debezium 提取 key 的转换中不存在字段

标签 apache-kafka apache-kafka-connect debezium ksqldb

我正在尝试创建一个 Debezium MySQL 连接器,并进行转换以提取 key 。

关键改造前:

create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'before',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "transforms" = 'unwrap',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope');

主题结果是:
> rowtime: 2020/05/20 16:47:23.354 Z, key: [St@5778462697648631933/8247607644536792125], value: {"id": "P195910", "price": "1511.64"}

当 key.converter 设置为 JSON 时,Key 变为 {"id": "P195910"}
所以,我想从键中提取 id 并将其设为字符串键:

预期成绩 :
rowtime: 2020/05/20 16:47:23.354 Z, 
key: 'P195910', 
value: {"id": "P195910", "price": "1511.64"}   

尝试使用 ExtractField 进行转换时或 ValueToKey我得到:

DataException: Field does not exist: id:



我尝试使用包含 ValueToKey 的指令:
create source connector mysql with(
    "connector.class" = 'io.debezium.connector.mysql.MySqlConnector',
    "database.hostname" = 'mysql',
    "tasks.max" = '1',
    "database.port" = '3306',
    "database.user" = 'debezium',
    "database.password" = 'dbz',
    "database.server.id" = '42',
    "database.server.name" = 'after',
    "table.whitelist" = 'deepprices.deepprices',
    "database.history.kafka.bootstrap.servers" = 'kafka:29092',
    "database.history.kafka.topic" = 'dbz.deepprices',
    "include.schema.changes" = 'true',
    "key.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "key.converter.schemas.enable" = 'TRUE',
    "value.converter" = 'org.apache.kafka.connect.json.JsonConverter',
    "value.converter.schemas.enable" = 'TRUE',
    "transforms" = 'unwrap,createkey',
    "transforms.unwrap.type" = 'io.debezium.transforms.UnwrapFromEnvelope',
    "transforms.createkey.type" = 'org.apache.kafka.connect.transforms.ValueToKey',
    "transforms.createkey.fields" = 'id'
    );

导致我的 出现以下错误Kafka 连接日志 :
Caused by: org.apache.kafka.connect.errors.DataException: Field does not exist: id
        at org.apache.kafka.connect.transforms.ValueToKey.applyWithSchema(ValueToKey.java:89)
        at org.apache.kafka.connect.transforms.ValueToKey.apply(ValueToKey.java:67)

最佳答案

UnwrapFromEnvelope 更改转换类型至 ExtractNewRecordState ,解决了 Debezium MySQL CDC 连接器的问题,版本 1.1.0 .

transforms.unwrap.type" = 'io.debezium.transforms.ExtractNewRecordState'

关于apache-kafka - 使用 Debezium 提取 key 的转换中不存在字段,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61919200/

相关文章:

java - 如何修复 'Offset commit failed on partition com.application.iot.measure.stage-0 at offset 1053078427: The request timed out.'

java - 卡夫卡流: ktable as lookup and destination to merge streams

f# - 使用 TPL 的代码是否有更符合 F# 习惯的方式?

elasticsearch - 使用kafka连接器推送到Elasticsearch时为invalid_argument_exception

postgresql - Kafka Connect Debezium postgres

json - kafka connect中按某个值过滤记录

java - Apache 卡夫卡 : How to find out consumer group of a topic?

kubernetes - elasticsearch 连接器不起作用 - java.lang.NoClassDefFoundError : com/google/common/collect/ImmutableSet

mysql - Debezium 失败 - 无法复制,因为主服务器清除了所需的二进制日志

regex - 带有正则表达式的 debezium 表白名单