sql-server - 如何使用 Debezium 将 250 个表从 MS SQL 提取到 Kafka 中

标签 sql-server apache-kafka apache-kafka-connect

嗨,我尝试在作为源的 PostgreSQL 和作为目标的 SQL Server 之间构建 Kafka 连接管道。我使用了3个Kafka代理,需要消费252个主题(一个主题与一张PostgreSQL表相同)。运行一个多小时后,只能从252张表中拉出218张。我发现的错误是SQL Server中有死锁机制,它可以将事务保存到SQL Server并尝试重试,而且Debezium复制槽也在那里。

我使用分布式连接器,水槽上最多有 3 个工作人员,但也许这似乎还不够。还可以尝试使用更高的 offset.time_out.ms 至 60000 和更高的偏移分区 (100)。这恐怕不是我想要的制作水平。任何人都可以对这个案例提出建议吗?是否有任何计算可以确定我需要的最佳 worker 数量?

更新

这是我遇到的一些错误。我看到一些连接器被杀死。 有人告诉我SQL SERVER 中发生了死锁:

[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

2020 年 4 月 14 日更新

我仍然有这个问题,我忘了告诉我如何部署连接器。现在我使用 2 个工作人员,一名用于源,一名用于接收器。我在 csv 中列出所有表和 pk,并循环遍历行以创建连接器,无需 sleep 或等待每分钟。我还对每个主题使用单个主题分区和 3 个副本。但我仍然有 sql server 连接死锁

最佳答案

问题可能是同时有多个任务访问同一个 SQL 表,并导致同步问题,如您提到的死锁。
由于您已经拥有大量主题,并且您的连接器可以并行访问它们,因此我建议您将每个主题的分区数减少到 1(在Kafka,因此您应该删除并使用新的分区数量重新创建每个主题)。
这样,每个主题只有一个分区;每个分区只能在单个线程(/task/consumer)中访问,因此没有机会对同一个表进行并行 SQL 事务。

或者,更好的方法是创建具有 3 个分区的单个主题(与您拥有的任务/使用者的数量相同),并让生产者使用 SQL 表名称作为消息键
Kafka 保证具有相同键的消息始终进入相同的分区,因此具有相同表的所有消息将驻留在单个分区上(单线程消耗)。

如果您觉得它有用,我可以附加有关如何创建 Kafka Producer 和发送键控消息的更多信息。

关于sql-server - 如何使用 Debezium 将 250 个表从 MS SQL 提取到 Kafka 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60814203/

相关文章:

java - 重启数据库后尝试重启 debezium mysql 连接器时出错

SQL:如果我的最大值是 255,使用 tinyint 而不是 Integer 是否有效?

sql-server - 如何在另一个存储过程中使用sp_configure?

docker - 卡夫卡连接 : Multiple DB2 JDBC Source Connectors fail

kubernetes - 如何在 Kubernetes 上为 Kafka-connect 创建连接器?

apache-kafka - java.lang.RuntimeException : Failed to resolve Oracle database version 错误

SQL Server 2008 差异数据库

sql - 比较两个不同表中的不相同字段

elasticsearch - Kafka连接器Elasticsearch topic.regex

amazon-s3 - 如何使用其字段和基于时间的分区为 json 配置 kafka s3 接收器连接器?