mysql - 如何告诉 debezuim Mysql 源连接器停止重新拍摄 kafka 主题中现有表的快照?

标签 mysql apache-kafka apache-kafka-connect debezium

我正在使用 Debezium MySQL CDC 源连接器将数据库从 mysql 移动到 Kafka。连接器工作正常,除了快照表现得很奇怪;连接器成功拍摄了第一个快照,然后几个小时后由于某些堆内存限制而下降(这不是问题)。我暂停了连接器,停止了集群上的工作程序,解决了问题,然后再次启动了工作程序...连接器现在运行良好,但再次拍摄快照! 看起来连接器没有从中断的地方恢复。我认为我的配置有问题。 我正在使用 Debezium 0.95。

我将 snapshot.mode=initial 更改为 initial_only,但它不起作用。

连接属性:

{
  "properties": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "snapshot.locking.mode": "minimal",
    "errors.log.include.messages": "false",
    "table.blacklist": "mydb.someTable",
    "include.schema.changes": "true",
    "database.jdbc.driver": "com.mysql.cj.jdbc.Driver",
    "database.history.kafka.recovery.poll.interval.ms": "100",
    "poll.interval.ms": "500",
    "heartbeat.topics.prefix": "__debezium-heartbeat",
    "binlog.buffer.size": "0",
    "errors.log.enable": "false",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "snapshot.fetch.size": "100000",
    "errors.retry.timeout": "0",
    "database.user": "kafka_readonly",
    "database.history.kafka.bootstrap.servers": "bootstrap:9092",
    "internal.database.history.ddl.filter": "DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/,INSERT INTO mysql.rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value \u003d .*,FLUSH RELAY LOGS.*,flush relay logs.*",
    "heartbeat.interval.ms": "0",
    "header.converter": "org.apache.kafka.connect.json.JsonConverter",
    "autoReconnect": "true",
    "inconsistent.schema.handling.mode": "fail",
    "enable.time.adjuster": "true",
    "gtid.new.channel.position": "latest",
    "ddl.parser.mode": "antlr",
    "database.password": "pw",
    "name": "mysql-cdc-replication",
    "errors.tolerance": "none",
    "database.history.store.only.monitored.tables.ddl": "false",
    "gtid.source.filter.dml.events": "true",
    "max.batch.size": "2048",
    "connect.keep.alive": "true",
    "database.history": "io.debezium.relational.history.KafkaDatabaseHistory",
    "snapshot.mode": "initial_only",
    "connect.timeout.ms": "30000",
    "max.queue.size": "8192",
    "tasks.max": "1",
    "database.history.kafka.topic": "history-topic",
    "snapshot.delay.ms": "0",
    "database.history.kafka.recovery.attempts": "100",
    "tombstones.on.delete": "true",
    "decimal.handling.mode": "double",
    "snapshot.new.tables": "parallel",
    "database.history.skip.unparseable.ddl": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "table.ignore.builtin": "true",
    "database.whitelist": "mydb",
    "bigint.unsigned.handling.mode": "long",
    "database.server.id": "6022",
    "event.deserialization.failure.handling.mode": "fail",
    "time.precision.mode": "adaptive_time_microseconds",
    "errors.retry.delay.max.ms": "60000",
    "database.server.name": "host",
    "database.port": "3306",
    "database.ssl.mode": "disabled",
    "database.serverTimezone": "UTC",
    "task.class": "io.debezium.connector.mysql.MySqlConnectorTask",
    "database.hostname": "host",
    "database.server.id.offset": "10000",
    "connect.keep.alive.interval.ms": "60000",
    "include.query": "false"
  }
}

最佳答案

我可以确认 Gunnar 上面的回答。在快照过程中遇到一些问题,不得不重新启动整个快照过程。目前,连接器不支持在某个点恢复快照。你的配置对我来说似乎很好。希望这会有所帮助。

关于mysql - 如何告诉 debezuim Mysql 源连接器停止重新拍摄 kafka 主题中现有表的快照?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57807223/

相关文章:

mysql - 使用 Debezium 通过 SSL 连接到 Cloud SQL 时出错

elasticsearch - Kafka连接 Elasticsearch ID创建多个字段不起作用

mysql - 需要优化 MySQL 查询

mysql - 为什么此查询在这种条件下不起作用?

apache-kafka - ksqlDB 中创建的流显示 NULL 值

spring-boot - 使用 Kafka 和 JPA 时的竞争条件

java - 卡夫卡 : Running Confluent in a Windows environment

java - 从 ResultSet Java 获取主键列

PHP - TCPDF 不显示页面和表格

java - 流式作业与循环批处理作业使用 Kafka 队列中的数据