jdbc - 如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句

标签 jdbc apache-kafka db2 apache-kafka-connect

我正在使用 kafka 连接到从 DB2 到 kafka 主题的源数据,我正在配置 sql 查询以从 DB2 读取数据,下面是查询

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL'

我正在使用设置 "timestamp.column.name": "CREATE_TS" 这里的问题是在查询中它们已经是 WHERE 子句,而 kafka connect 试图添加另一个带有时间戳列的 where 子句,它正在产生问题,另一个问题是,如果我从 sql 子句中删除 where 子句,如下所示

SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR

然后 substr 出现错误,如下所示

SQL Error [22011]: THE SECOND OR THIRD ARGUMENT OF THE SUBSTR OR SUBSTRING FUNCTION IS OUT OF RANGE. SQLCODE=-138, SQLSTATE=22011, DRIVER=4.19.26

任何人都可以就这两个问题提出建议,我被困在这一点上。

最佳答案

发生这种情况是因为您试图同时使用 "mode": "timestamp"queryTimestampIncrementingTableQuerierWHERE 子句附加到与 query 中现有 WHERE 子句冲突的查询。

JDBC source connector docs对此很清楚:

query

If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.


作为解决方法,您可以将查询修改为(取决于您使用的 SQL 风格)

SELECT * FROM ( SELECT * FROM table WHERE ...)

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

例如,在您的情况下,查询应该是

"query":"SELECT * FROM (SELECT SEQ_I AS error_id, TRIM(SEND_I) AS sca , to_char(CREATE_TS,'YYYY-MM-DD HH24:MI:SS.FF3') AS create_timestamp, CREATE_TS, TRIM(ERR_MSG) AS error_message , CASE substr(ERR_MSG,1,locate('-',ERR_MSG)-1) WHEN 'WARNING' THEN 'W' WHEN 'SUSPENDED' THEN 'F' END ERROR_TYPE FROM INTCHG_ERROR_DIR WHERE TRAN_I ='503' AND PRCS_N = 'GLOBAL') o"

关于jdbc - 如何在 Kafka Connect JDBC Source 连接器中添加显式 WHERE 子句,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56629360/

相关文章:

java - 在 JAVA 中 - 如何将 json 数据插入嵌套的 json 属性?

java - 为什么使用 getter 函数代替 "?"时会出错

java - 尝试在 yahoo streaming-benchmark 上运行 STORM_TEST 时出错

db2 - 在 DB2 和 RPG 中设置 NULL 值

sql - 游标 SQL_CURLH200C1 未处于准备状态

java - PreparedStatement 忽略查询中的参数 : java. sql.SQLException : Parameter index out of range (1 > number of parameters, 为 0)

apache-kafka - KSQL创建具有多列聚合的表

database - 使用 db2batch 的 db2 过程 SYSPROC.ADMIN_DROP_SCHEMA

hibernate - Elasticsearch 需要数据库吗?