我正在尝试使用 Kafka 连接仅向 DB(消费者)提供一定数量的新行。为此,我将源配置文件配置为
这是 source.properties 的样子:
name=source-postgres
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
batch.max.rows = 10
connection.url=jdbc:postgresql://<URL>/postgres?user=postgres&password=post
mode=timestamp+incrementing
timestamp.column.name=updated_at
incrementing.column.name=id
topic.prefix=postgres_
这是接收器属性文件的内容
name=dbx-sink
batch.size=5
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
# The topics to consume from - required for sink connectors like this one
topics=postgres_users
# Configuration specific to the JDBC sink connector.
# We want to connect to a SQLite database stored in the file test.db and auto-create tables.
connection.url=jdbc:postgresql://<URL>:35000/postgres?user=dba&password=nopasswd
auto.create=true
但这没有任何效果,只要有新行可用,它就会被插入到数据库(消费者)中。因此,我向接收器 batch.size=10
添加了另一个配置参数。这也没有效果。
当我启动 connect-standalone.sh 脚本时,我可以在控制台上看到 batch.max.rows = 10。
我做错了什么或如何解决?
最佳答案
batch.max.rows
将每批发送 10 行;它不会限制总共发送的行数。
关于jdbc - 如何配置kafka集群进行批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46177868/