apache-kafka - 连接器配置不包含连接器类型

标签 apache-kafka apache-kafka-connect confluent-platform

我正在尝试使用 JDBC Connector连接到我集群上的 PostgreSQL 数据库(该数据库不直接由集群管理)。

我一直在使用以下命令调用 Kafka Connect:

connect-standalone.sh worker.properties jdbc-connector.properties

这是worker.properties文件的内容:

class=io.confluent.connect.jdbc.JdbcSourceConnector
name=test-postgres-1
tasks.max=1

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/home/user/offest
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.json.JsonConverter

connection.url=jdbc:postgresql://database-server.url:port/database?user=user&password=password

这是 jdbc-connector.properties 的内容:

mode=incrementing
incrementing.column.name=id
topic.prefix=test-postgres-jdbc-

当我尝试使用上述命令启动连接器时,它崩溃并出现以下错误:

[2018-04-16 11:39:08,164] ERROR Failed to create job for jdbc.properties (org.apache.kafka.connect.cli.ConnectStandalone:88)
[2018-04-16 11:39:08,166] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {mode=incrementing, incrementing.column.name=pdv, topic.prefix=test-postgres-jdbc-} contains no connector type
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {mode=incrementing, incrementing.column.name=id, topic.prefix=test-postgres-jdbc-} contains no connector type
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:233)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)

在注意到导致错误的连接器仅显示来自 jdbc-connector.properties 的信息后,我尝试将这两个文件合并在一起,但是命令突然终止(没有创建主题或偏移文件)具有以下输出:

[SLF4J infos...]
[2018-04-16 11:48:54,620] INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:59)

最佳答案

您需要在 jdbc-connector.properties 中拥有大部分这些属性,而不是在 worker.properties 中。参见 https://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html有关连接器配置中配置选项的完整列表(jdbc-connector.properties 在您的示例中)。

试试这个:

  • worker.properties:

    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    offset.storage.file.filename=/home/user/offest
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter=org.apache.kafka.connect.json.JsonConverter
    
  • jdbc-connector.properties:

    class=io.confluent.connect.jdbc.JdbcSourceConnector
    name=test-postgres-1
    tasks.max=1
    
    mode=incrementing
    incrementing.column.name=id
    topic.prefix=test-postgres-jdbc-
    
    connection.url=jdbc:postgresql://database-server.url:port/database?user=user&password=password
    

您可以在此处查看更多有关 Kafka Connect 的示例:

关于apache-kafka - 连接器配置不包含连接器类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49856692/

相关文章:

mysql - 如何配置 Debezium Mysql 连接器来生成原始键而不是 struct 或 json 对象?

mongodb - 如何通过 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB

linux - Kafka 连接 - 无法连接到本地主机端口 8083 : Connection refused

java - 编译 Confluence KsqlDB 时的类型转换问题

c# - 我无法使用 SSL(TLS) 作为 kafka 生产者从 C# 发布 JSON 消息

python - 如何强制关闭用 python 打开的套接字?

python - 如何使用 Pykafka 获取主题的最新消息?

apache-kafka - WAN 上的 Kafka 生产者/消费者?

apache-kafka - Kafka Connect JDBC 连接器 - 由于不可恢复的异常而退出 WorkerSinkTask

amazon-s3 - 如何使用 Kafka S3 Sink Connector 中的 FieldPartitioner 创建分区