jdbc - 如何配置kafka集群进行批处理

标签 jdbc apache-kafka apache-kafka-connect confluent-platform

我正在尝试使用 Kafka 连接仅向 DB(消费者)提供一定数量的新行。为此,我将源配置文件配置为

这是 source.properties 的样子:

 name=source-postgres
 connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
 tasks.max=1
 batch.max.rows = 10
 connection.url=jdbc:postgresql://<URL>/postgres?user=postgres&password=post
 mode=timestamp+incrementing
 timestamp.column.name=updated_at
 incrementing.column.name=id
 topic.prefix=postgres_

这是接收器属性文件的内容

name=dbx-sink
batch.size=5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1

# The topics to consume from - required for sink connectors like this one
topics=postgres_users

# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:postgresql://<URL>:35000/postgres?user=dba&password=nopasswd
auto.create=true

但这没有任何效果,只要有新行可用,它就会被插入到数据库(消费者)中。因此,我向接收器 batch.size=10 添加了另一个配置参数。这也没有效果。

当我启动 connect-standalone.sh 脚本时,我可以在控制台上看到 batch.max.rows = 10。

我做错了什么或如何解决?

最佳答案

batch.max.rows 将每批发送 10 行;它不会限制总共发送的行数。

关于jdbc - 如何配置kafka集群进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46177868/

相关文章:

apache-kafka - 是否可以禁用某些 kafka 主题的缓存?

apache-kafka - 为什么在 Kafka Connect 中使用 JDBC Sink Connector 无法删除记录

java - 用于 SQLite v2.X 的 sqlite-jdbc

postgresql - java - PSQLException : ERROR: syntax error at or near "$1"

java - MySQL 数据库在 8 小时后断开连接。如何预防?

java - 从 kafka 中的轮询记录创建批处理

amazon-s3 - kafka 连接 s3 源无法与 Minio 一起使用

mysql - 将 Async 与 JDBC 结合使用时,doInBackground() 不起作用

kotlin - Spring Cloud Kafka限制单位时间消息消耗

java - 连接到在 Docker 中运行的 Kafka