mysql - 尝试加载 JDBC 接收器连接器时出错

标签 mysql jdbc apache-kafka mysql-connector

我试图将数据从 Kafka 主题流式传输到 MySQL 数据库,但未成功。尽管 source connector 工作正常(即将数据从 MySQL 数据库流式传输到 kafka 主题),sink connector 无法加载。

这是我的 sink-mysql.properties 文件:

name=sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=test-mysql-jdbc-foobar
connection.url=jdbc:mysql://127.0.0.1:3306/demo?user=user1&password=user1pass
auto.create=true

当我尝试执行时

./bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/sink-mysql.properties

报错如下:

[2018-02-01 16:17:43,019] ERROR WorkerSinkTask{id=sink-mysql-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:515)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: test-mysql-jdbc-foobar
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:71)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
[2018-02-01 16:17:43,020] ERROR WorkerSinkTask{id=sink-mysql-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: test-mysql-jdbc-foobar
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:127)
    at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:64)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:71)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:66)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
    ... 10 more
[2018-02-01 16:17:43,021] ERROR WorkerSinkTask{id=sink-mysql-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:173)

请注意,主题 test-mysql-jdbc-foobar 包含从 MySQL 流式传输到 kafka 的数据,但是,我无法将这些数据从 MySQL 流式传输回 kafka。 sink-mysql.properties 的内容看起来与 official confluent's documentation 中使用的相同但它似乎不起作用。此外,mysql-connector 位于正确的目录中(share/java/kafka-connect-jdbc/ 下)。

编辑

这是我的工作人员配置文件的内容:

bootstrap.servers=localhost:9092
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false


# Local storage file for offset data
offset.storage.file.filename=/tmp/connect.offsets

plugin.path=share/java

最佳答案

为了能够使用 JDBC 接收器,您的消息必须具有架构。这可以通过使用 Avro + Schema Registry 或 JSON with schemas 来实现。在您指定的工作人员配置中:

key.converter.schemas.enable=false
value.converter.schemas.enable=false

这意味着 JSON 不会包含模式。

如果您启用模式,Kafka Connect 将生成(作为源)和预期(作为接收器)的 JSON 示例:https://gist.github.com/rmoff/2b922fd1f9baf3ba1d66b98e9dd7b364

关于mysql - 尝试加载 JDBC 接收器连接器时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48565112/

相关文章:

php - 使用当前数据+另一个表中定义的范围中的随机数更新数据库表

apache-kafka - 卡夫卡 : Have each poll() call only consume from one topic at a time?

java - Spring Boot Kafka 消费者多种类型崩溃

php mysql 没有返回所需的整数,简单错误

mysql - 从 json 文件创建表

mysql - symfony2 和 doctrine mysql 唯一约束错误信息

java - 未包装的连接是否被重用?

java - com.mysql.jdbc.exceptions.MySQLNonTransientConnectionException : Can't call rollback when autocommit=true

java - DBCP 数据源池?

docker - 向主机 Kafka 服务器生成消息时出现容器化 Kafka 客户端错误