elasticsearch - 将 Elasticsearch 与 Kafka Connect 连接时如何解决 "schema.registry.url"中的问题?

标签 elasticsearch apache-kafka apache-kafka-connect confluent-schema-registry

我正在尝试将 Kafka Connect 与 Elasticsearch 接收器连接起来。我不是在融合模式下使用,而是在独立模式下使用。这是我的 elasticsearch 连接器配置。

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=mysql-jdbc-mall
key.ignore=true
schema.ignore=true
connection.url=http://172.**.*.**:5601
type.name=kafka-connect
elastic.security.protocol=SSL
key.converter.schemas.enable=false
value.converter.schemas.enable=false

我的connect-standalone.properties是

bootstrap.servers=Ni****ing:9092
key.converter=io.confluent.connect.avro.AvroConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

当我运行连接器时,我遇到了问题。

[2020-01-21 09:31:03,676] ERROR Failed to start task elasticsearch-sink-0 (org.apache.kafka.connect.runtime.Worker:464)
io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
        at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
        at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
        at io.confluent.connect.avro.AvroConverterConfig.<init>(AvroConverterConfig.java:27)
        at io.confluent.connect.avro.AvroConverter.configure(AvroConverter.java:58)
        at org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:268)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:440)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:311)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:336)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:214)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2020-01-21 09:31:03,677] INFO Created connector elasticsearch-sink (org.apache.kafka.connect.cli.ConnectStandalone:112)

更新

因为我没有“/etc/schema-registry”文件。我将 connect-standalone.properties 更改为

bootstrap.servers=Nifi-Staging:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/share/java

但是当我使用 JSONConverter 时出现此错误。

[2020-01-21 16:12:04,939] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
java.lang.NullPointerException
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:231)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:142)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:133)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:122)
        at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:51)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:300)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:189)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2020-01-21 16:12:04,946] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2020-01-21 16:12:04,946] INFO Stopping ElasticsearchSinkTask (io.confluent.connect.elasticsearch.ElasticsearchSinkTask:190)

最佳答案

io.confluent.connect.avro.AvroConverter 需要 来定义 schema.registry.url

删除两个 schemas.enable 属性,因为它们只适用于 JSON ; Avro 总是有一个架构,然后添加 URL 来代替

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://...
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://...

您可以在 etc/schema-registry 文件夹下找到示例连接属性文件

如果您不使用 Avro,请更改转换器以匹配您的数据。键和值也可以是完全不同的类型


另外,elasticsearch url要不同;例如在端口 9200 上运行的东西,而不是 5601 上的 kibana


I am not using in confluent but in standalone mode.

我假设你的意思是 confluent 命令?这只是为你运行 kafka-connect-distributed,分布式模式实际上是首选。

关于elasticsearch - 将 Elasticsearch 与 Kafka Connect 连接时如何解决 "schema.registry.url"中的问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59834015/

相关文章:

java - 卡夫卡 : How can I create a fat Jar of the HDFS Kafka connector?

jdbc - 使用 kafka-connect 从多个主题更新到多个表

apache-kafka - 在分布式模式下使用 HDFS 连接器接收器避免来自 Kafka 连接的小文件

elasticsearch - 在Elasticsearch中为索引重新索引-ELK

java - Elasticsearch QueryBuilder 匹配多个术语

apache-kafka - 如何在kubernetes配置属性中结合使用POD_NAME和其他值

apache-kafka - 使用连接接收器 HDFS 连接器包含来自 Kafka 消息的 key

elasticsearch - Docker不会发布选定的端口

elasticsearch - 日期直方图汇总后如何再次执行日期直方图?

mongodb - 如何通过 Kafka Connector 将数据从 Kafka 流式传输到 MongoDB