postgresql - 查询模式下的kafka jdbc source connector错误

标签 postgresql jdbc apache-kafka

<分区>

我在 kafka 中有一个 JDBCSourceConnector,它使用查询从数据库流式传输数据。 但是我为选择数据而编写的查询有问题。

我在 Postgres psql 和 DBeaver 中测试了查询。它工作正常,但在 kafka 配置中,它会产生 SQL 语法错误

错误

ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='null', query='select "Users".* from "Users" join "SchoolUserPivots" on "Users".id = "SchoolUserPivots".user_id where school_id = 1 and role_id = 2', topicPrefix='teacher', timestampColumn='"Users".updatedAt', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:221) org.postgresql.util.PSQLException: ERROR: syntax error at or near "WHERE"

配置json

 {
  "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
  "timestamp.column.name": "\"Users\".updatedAt",
  "incrementing.column.name": "id",
  "connection.password": "123",
  "tasks.max": "1",
  "query": "select \"Users\".* from \"Users\" join \"SchoolUserPivots\" on \"Users\".id = \"SchoolUserPivots\".user_id where school_id = 1 and role_id = 2",
  "timestamp.delay.interval.ms": "5000",
  "mode": "timestamp+incrementing",
  "topic.prefix": "teacher",
  "connection.user": "user",
  "name": "SourceTeacher",
  "connection.url": "jdbc:postgresql://ip:5432/school",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"
}

最佳答案

您不能将 "mode": "timestamp+incrementing", 与包含 WHERE 的自定义 query 一起使用。

参见 https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector了解更多详情,以及 https://github.com/confluentinc/kafka-connect-jdbc/issues/566 .该 github 问题提出了一种解决方法,即对您的查询使用子选择。

关于postgresql - 查询模式下的kafka jdbc source connector错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56786337/

相关文章:

java - ResultSet 和 Select * 性能

java - log4j2 如何从文件中读取属性变量到 log4j2

java - kafka 获取主题的分区数

go - Sarama Kafka ConsumerGroup函数返回

sql - 在一列上选择 Distinct,不按该列排序

ruby-on-rails - 在不同服务器上使用 Rails 的 postgresql COPY 命令的问题

mysql - JBDC和MYSQL空值插入表

sql - 试用 PostgreSQL/Postgres(我是一个普通的 MySQL 用户)——推荐任何引用资料,任何陷阱?

sql - 是否可以在 PostgreSQL 中创建一个带有变量名的表?

docker - Py4JJavaError : An error occurred while calling o45. 加载。 : java. lang.NoClassDefFoundError:org/apache/spark/sql/sources/v2/StreamWriteSupport