jdbc - Kafka JDBC Connect(源和接收器)和 Informix

标签 jdbc apache-kafka apache-kafka-connect informix

有什么方法可以将“:”转换为“.”在 jdbc 连接器构造的查询中?

“SELECT * FROM 数据库:用户:表 WHERE 数据库:用户:表”

连接器配置:

"name": "jdbc_source_connector",
"config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
    "topic.prefix" : "table-",
    "poll.interval.ms" : "100000",
    "mode" : "incrementing",
    "table.whitelist" : "table",
    "query.suffix" : ";",
    "incrementing.column.name" : "lp"

错误:

[2021-02-03 14:03:02,809] INFO Begin using SQL query: SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table : lp  > ? ORDER B
Y  database  :  user  :  table : lp  ASC ; (io.confluent.connect.jdbc.source.TableQuerier:164)
[2021-02-03 14:03:02,853] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table="database "." user "." table", query='null', top
icPrefix='database-', incrementingColumn='lp', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:404)
java.sql.SQLSyntaxErrorException: A syntax error has occurred.

连接器 session :

informix database:/usr/informix$ onstat -g ses 544271

IBM Informix Dynamic Server Version 12.10.FC13 -- On-Line -- Up 60 days 18:26:06 -- 4985440 Kbytes

session           effective                                               #RSAM    total      used       dynamic
id       user     user      tty      pid      hostname                    threads  memory     memory     explain
544271   user      -         -        1266352  kafkahost                  1        172032     102672     off

Program :
Thread[id:175, name:task-thread-jdbc_source_connector_database, path:/app/kafka_2.13-2.7.0/plugins/kafka-connect-jdbc-10.0.1/lib/jdbc-4.50.4.1.jar]

tid      name     rstcb            flags    curstk   status
583042   sqlexec  7000000437451a8  Y--P---  6224     cond wait  netnorm   -

Memory pools    count 2
name         class addr              totalsize  freesize   #allocfrag #freefrag
544271       V     70000004f6c7040  167936     68592      113        35
544271*O0    V     700000065ad0040  4096       768        1          1

name           free       used           name           free       used
overhead       0          6656           scb            0          144
opentable      0          11352          filetable      0          1040
log            0          16536          temprec        0          22688
keys           0          816            gentcb         0          1592
ostcb          0          3472           sqscb          0          25128
hashfiletab    0          552            osenv          0          2056
sqtcb          0          9336           fragman        0          640
sapi           0          144            udr            0          520

sqscb info
scb              sqscb            optofc   pdqpriority optcompind  directives
7000000343d3360  700000039194028  0        0           0           1

Sess       SQL            Current            Iso Lock       SQL  ISAM F.E.
Id         Stmt type      Database           Lvl Mode       ERR  ERR  Vers  Explain
544271     -              database                CR  Not Wait   0    0    9.28  Off

Last parsed SQL statement :
  SELECT * FROM  database  :  user  :  table  WHERE  database  :  user  :  table :
    lp  > ? ORDER BY  database  :  user  :  table : lp  ASC

最佳答案

因此,唯一的解决方法是将查询选项添加到连接器配置中

修改后的配置:

"name": "jdbc_source_connector",
"config": {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url" : "jdbc:informix-sqli://IP:PORT/databasa:informixserver=oninit;user=user;password=password",
    "topic.prefix" : "table-tablename",
    "poll.interval.ms" : "100000",
    "mode" : "incrementing",
    "query" : "select * from table ",
    "incrementing.column.name" : "lp"

关于jdbc - Kafka JDBC Connect(源和接收器)和 Informix,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66029521/

相关文章:

java - 带有 sun.security.validator.ValidatorException : PKIX path building failed 的 JDBC 异常

java - 如何在 mysql 中为多个数据操作编写单个查询?

java - 使用 Avro Serializer 初始化 Kafka Producer 时出错 : java. lang.NoClassDefFoundError

jdbc - Kafka Connect JDBC OOM - 大数据量

java - Sun JDBC ODBC 驱动程序或 MSSQL JDBC 驱动程序

multithreading - 在 Kafka 中设计消息键的最佳方法是什么?

docker - Kafka 与 Docker 动态adverted_host_name

apache-kafka - 有没有办法更新正在运行的 kafka 连接器的配置

elasticsearch - Kafka Mysql CDC到 Elasticsearch

java - 如何检查 jdbc 连接和 dbbool 连接是否达到最大值?