jdbc - 卡夫卡连接: JDBC Source Connector : create Topic with multiple partitions

标签 jdbc apache-kafka hdfs apache-kafka-connect confluent-platform

我创建了一个示例管道,从 MySQL 轮询数据并将数据写入 HDFS(也包括 hive 表)。

由于我的要求,我需要为每个数据库表创建源+连接器对。 接下来,我发布了源连接器和接收器连接器的配置设置。

我可以看到一个主题是用一个分区创建的,复制因子为 1。

主题创建应该是自动的,这意味着我无法在创建 Source+Sink 对之前手动创建主题。

我的问题:

1) 有没有办法在创建源连接器时配置分区数量和复制因子?

2)如果可以创建多个分区,Source Connector 使用什么样的分区策略?

3) 应该为源连接器和接收器连接器创建正确的工作线程数量是多少?

源连接器:

{
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "mode": "timestamp+incrementing",
  "timestamp.column.name": "modified",
  "incrementing.column.name": "id",
  "topic.prefix": "jdbc_var_cols-",
  "tasks.max": "1",
  "poll.interval.ms": "1000",
  "query": "SELECT id,name,email,department,modified FROM test",
  "connection.url": "jdbc:mariadb://127.0.0.1:3306/connect_test?user=root&password=confluent"
}

水槽连接器:

{
  "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
  "topics.dir": "/user/datalake/topics-hive-var_cols3",
  "hadoop.conf.dir": "/tmp/quickstart/hadoop/conf",
  "flush.size": "5",
  "schema.compatibility": "BACKWARD",
  "connect.hdfs.principal": "<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="22464356434e434947626f7b7067636e6f0c6e6d61636e" rel="noreferrer noopener nofollow">[email protected]</a>",
  "connect.hdfs.keytab": "/tmp/quickstart/datalake.keytab",
  "tasks.max": "3",
  "topics": "jdbc_var_cols-",
  "hdfs.url": "hdfs://mycluster:8020",
  "hive.database": "kafka_connect_db_var_cols3",
  "hdfs.authentication.kerberos": "true",
  "rotate.interval.ms": "1000",
  "hive.metastore.uris": "thrift://hive_server:9083",
  "hadoop.home": "/tmp/quickstart/hadoop",
  "logs.dir": "/logs",
  "format.class": "io.confluent.connect.hdfs.avro.AvroFormat",
  "hive.integration": "true",
  "hdfs.namenode.principal": "nn/<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="b7e8fff8e4e3f7faeee5f2f6fbfa99fbf8f4f6fb" rel="noreferrer noopener nofollow">[email protected]</a>",
  "hive.conf.dir": "/tmp/quickstart/hadoop/conf"
}

最佳答案

1) Is there a way to configure the number of partitions and replication factor when creating the Source Connector?

不是来自 Connect,不是。

听起来您在代理上启用了自动主题创建,因此它使用默认值。理想情况下应在生产环境中禁用此功能,因此您必须提前创建主题。

what kind of partitioning strategy does the Source Connector use?

取决于哪个连接器以及代码的编写方式(即是否/如何生成记录的 key )。举例来说,对于 JDBC 连接器,键可能是数据库表的主键。它将使用 DefaultPartitioner 进行哈希处理。我不相信 Connect 允许您在每个连接器级别指定自定义分区程序。如果键为空,则消息将分布在所有分区上。

3) Whats the correct number of workers should be created for Source and Sink Connectors?

同样,取决于来源。对于 JDBC,每个表都有一个任务。

但是,对于接收器,任务最多只能达到正在接收的主题的分区数量(与所有消费者组一样)。


此外,您通常会与数据库(和 Hadoop 集群)分开运行 Connect 集群

关于jdbc - 卡夫卡连接: JDBC Source Connector : create Topic with multiple partitions,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54665050/

相关文章:

java - SQL错误或缺少数据库(没有这样的表:Employeeinfo) using sqlite and net beans

java - 适用于 Java 7 的 JDBC 驱动程序

apache-kafka - 为什么kafka索引文件使用内存映射文件,而日志文件不使用?

java - 如何在 Hadoop 文件系统中获取绝对路径?

hadoop - 如何在运行 copyFromLocal 命令时更改复制因子?

hadoop - 锁定 HDFS 中的目录

java - 无法使用 MySql 中的预准备语句创建数据库

sql - 如何使用 NamedJdbcTemplate 只选择一个字符串值

kubernetes - 水平 Pod Autoscale 无法读取指标

java - kafka KStream - 进行 n 秒计数的拓扑