apache-kafka - debezium - 更改主题名称会导致跨数据库引用错误

标签 apache-kafka apache-kafka-connect confluent-platform confluent-schema-registry debezium

我正在使用这个debezium-examples

source.json

{
"name": "inventory-connector",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"
}
}

jdbc-sink.json

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

我已经运行了这个示例,它工作正常。但是当我进行了一些更改时,如以下场景中所述。它给了我“跨数据库引用”错误。

场景

我已从源中删除这些属性

    "transforms": "route",
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
    "transforms.route.replacement": "$3"

现在它在kafka中创建主题如下

dbServer1.inventory.products

dbserver1.inventory.products_on_hand

dbserver1.inventory.customers

dbserver1.inventory.orders

当我在 jdbc-sink 中指定 topic= dbserver1.inventory.customers 时,它给了我以下异常

 ERROR:  cross-database references are not implemented: 
"dbserver1.inventory.customers" at character 14
postgres_1   | STATEMENT:  CREATE TABLE "dbserver1"."inventory"."customers" (
postgres_1   |  "last_name" TEXT NOT NULL,
postgres_1   |  "id" INT NOT NULL,
postgres_1   |  "first_name" TEXT NOT NULL,
postgres_1   |  "email" TEXT NOT NULL,
postgres_1   |  PRIMARY KEY("id"))
connect_1    | 2019-01-29 09:39:18,931 WARN   ||  Create failed, will attempt amend if table already exists   [io.confluent.connect.jdbc.sink.DbStructure]
connect_1    | org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "dbserver1.inventory.customers"
connect_1    |   Position: 14

注意:它不重复,因为我也发布了其他问题,涵盖了不同的场景

最佳答案

更改库存 -> dbserver1

(数据库名称).(架构名称).(表名称)

{
"name": "jdbc-sink",
"config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "topics": "customers",
    "connection.url": "jdbc:postgresql://postgres:5432/dbserver1?user=postgresuser&password=postgrespw",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
    "auto.create": "true",
    "insert.mode": "upsert",
    "pk.fields": "id",
    "pk.mode": "record_value"
}
}

关于apache-kafka - debezium - 更改主题名称会导致跨数据库引用错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54418451/

相关文章:

amazon-s3 - 强制 Confluence s3 水槽冲洗

apache-kafka - 错误读取字段 'topic_metadata' : Error reading array of size 1139567, 只有 45 个字节可用

java - 如何增加Kafka Topic的分区数量

java - 将日志从 kafka 连接器推送到端口

jdbc - Kafka JDBC Sink Connector 无法在 Snowflake 中找到表

go - 对于主题的某些分区,kafka 偏移量和滞后是未知的

ubuntu - 无法将主管与 Apache Kafka 一起使用

linux - 遇到 Ubuntu .profile 和 .bashrc 问题

amazon-s3 - 如何从 s3 接收器连接器中的信封类型架构中提取嵌套字段

java - Confluence JDBC 连接器和 Flink 消费者