apache-kafka - 在 Kafka Connect 中使用自定义转换器?

标签 apache-kafka apache-kafka-connect

我正在尝试将自定义转换器与 Kafka Connect 一起使用,但我似乎无法正确使用它。我希望有人对此有经验并可以帮助我解决这个问题!

初步情况

会发生什么?

当连接器启动时,它们会正确加载 jar 并找到自定义转换器。确实,这就是我在日志中看到的:

[2017-10-10 13:06:46,274] INFO Registered loader: PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-converter-1.0-SNAPSHOT.jar} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
[2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
[...]
[2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and 'CustomString' to plugin 'custom.CustomStringConverter' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)

然后我将 JSON 配置发布到连接器节点之一以创建我的连接器:

{
  "name": "hdfsSinkCustom",
  "config": {
    "topics": "yellow",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "custom.CustomStringConverter",
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
    "topics.dir": "yellow_storage",
    "flush.size": "1",
    "rotate.interval.ms": "1000"
  }
}

并收到以下回复:

{
   "error_code": 400,
   "message": "Connector configuration is invalid and contains the following 1 error(s):\nInvalid value custom.CustomStringConverter for configuration value.converter: Class custom.CustomStringConverter could not be found.\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"
}

我错过了什么?

如果我尝试运行 Kafka Connect stadnalone,错误消息是相同的。

有人遇到过这种情况吗?我错过了什么?

最佳答案

好的,感谢 Kafka 用户邮件列表中的 Philip Schmitt,我找到了解决方案。

他提到了这个问题:https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-6007 ,这确实是我面临的问题。

引用他的话:

To test this, I simply copied my SMT jar to the folder of the connector I was using and adjusted the plugin.path property.

确实,我通过将转换器放入连接器的文件夹中消除了这个错误。

我还尝试了其他方法:创建一个自定义连接器并将该自定义连接器与自定义转换器一起使用,两者都作为插件加载。它也有效。

总结:转换器由连接器加载。如果您的连接器是插件,您的转换器也应该是插件。如果您的连接器不是插件(与您的 kafka 连接发行版捆绑在一起),那么您的转换器也不应该是。

关于apache-kafka - 在 Kafka Connect 中使用自定义转换器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46712095/

相关文章:

docker - 向主机 Kafka 服务器生成消息时出现容器化 Kafka 客户端错误

apache-kafka - Druid Kafka 通过 read-your-writes 进行摄取

elasticsearch - Kafka Elasticsearch 连接器时间戳

java - 使用 FileStreamSink 连接器将 Kafka 数据写入二进制文件

apache-kafka - Debezium Kafka 连接错误 - TimeoutException : Timeout expired while fetching topic metadata

apache-kafka - 卡夫卡有重复的消息

java - 如何从ConsumerGroup中的所有分区获取最后一条日志

apache-kafka - 了解 Confluent 控制中心

oracle - 无法使用 confluent CLI : java. sql.SQLException 设置 CLASSPATH:找不到适合 jdbc:oracle:thin 的驱动程序

java - 如何基于 Avro Schema 将 XML 转换为 AVRO?