apache-kafka - Kafka Connect REST接口(interface) "PUT/connectors/(string: name)/config"返回错误代码500

标签 apache-kafka apache-kafka-connect

我有一个分布式模式的 3 节点 kafka-connect 工作集群,带有正在运行的 s3 接收器连接器。为了在运行时更新连接器的配置,我运行 command如下:

curl -X PUT -H "Content-Type: application/json" --data @s3.json http://localhost:8083/connectors/<connector-name>/config

但是,它返回:

{"error_code":500,"message":"Cannot deserialize instance of `java.lang.String` out of START_OBJECT token\n at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 14] (through reference chain: java.util.LinkedHashMap[\"config\"])"}

worker的完整日志在这里:

 [2018-11-19 15:10:47,471] ERROR Uncaught exception in REST call to /connectors/s3-sink-common-log/config (org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper:61)
com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `java.lang.String` out of START_OBJECT token
 at [Source: (org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$UnCloseableInputStream); line: 1, column: 14] (through reference chain: java.util.LinkedHashMap["config"])
    at com.fasterxml.jackson.databind.exc.MismatchedInputException.from(MismatchedInputException.java:63)
    at com.fasterxml.jackson.databind.DeserializationContext.reportInputMismatch(DeserializationContext.java:1342)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1138)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnexpectedToken(DeserializationContext.java:1092)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:63)
    at com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:527)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:364)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
    at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1574)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:965)
    at com.fasterxml.jackson.jaxrs.base.ProviderBase.readFrom(ProviderBase.java:815)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.invokeReadFrom(ReaderInterceptorExecutor.java:257)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor$TerminalReaderInterceptor.aroundReadFrom(ReaderInterceptorExecutor.java:236)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.server.internal.MappableExceptionWrapperInterceptor.aroundReadFrom(MappableExceptionWrapperInterceptor.java:73)
    at org.glassfish.jersey.message.internal.ReaderInterceptorExecutor.proceed(ReaderInterceptorExecutor.java:156)
    at org.glassfish.jersey.message.internal.MessageBodyFactory.readFrom(MessageBodyFactory.java:1091)
    at org.glassfish.jersey.message.internal.InboundMessageContext.readEntity(InboundMessageContext.java:874)
    at org.glassfish.jersey.server.ContainerRequest.readEntity(ContainerRequest.java:271)
    at org.glassfish.jersey.server.internal.inject.EntityParamValueParamProvider$EntityValueSupplier.apply(EntityParamValueParamProvider.java:97)
    at org.glassfish.jersey.server.internal.inject.EntityParamValueParamProvider$EntityValueSupplier.apply(EntityParamValueParamProvider.java:80)
    at org.glassfish.jersey.server.spi.internal.ParamValueFactoryWithSource.apply(ParamValueFactoryWithSource.java:74)
    at org.glassfish.jersey.server.spi.internal.ParameterValueHelper.getParameterValues(ParameterValueHelper.java:92)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$AbstractMethodParamInvoker.getParamValues(JavaResourceMethodDispatcherProvider.java:133)
    at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:200)
    at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:103)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:493)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:415)
    at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:104)
    at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:277)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)
    at org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:316)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:298)
    at org.glassfish.jersey.internal.Errors.process(Errors.java:268)
    at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)
    at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)
    at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)
    at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)
    at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:370)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:389)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:342)
    at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:229)
    at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
    at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:535)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
    at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
    at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
    at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
    at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
    at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
    at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)
    at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)
    at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    at org.eclipse.jetty.server.Server.handle(Server.java:531)
    at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)
    at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
    at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
    at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
    at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
    at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:132)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
    at java.lang.Thread.run(Thread.java:748)
[2018-11-19 15:10:47,476] INFO 127.0.0.1 - - [19/Nov/2018:23:10:47 +0000] "PUT /connectors/s3-sink-common-log/config HTTP/1.1" 500 294  38 (org.apache.kafka.connect.runtime.rest.RestServer:60)

背景: 工作集群位于 Ubuntu 16.04 上,软件包 confluence-kafka-connect-s3=5.0.0-1 和 confluence-kafka-2.11=2.0.0-1。 apache-kafka的版本是2.0.0-cp1。 s3.json 文件的内容是正确的,因为我可以删除当前连接器并使用新的 json 文件重新创建它。

最佳答案

已解决。对于 PUT/connectors/(string: name)/config API,s3.json 的内容是错误的。如果POST/connectors API的json文件格式如下:

{
  "name": "<connector-name>",
  "config": { 
     "<blabla...>",
  }
}

那么正确的 json 文件格式为:

{ 
   "name": "<connector-name>",
   "<blabla...>"
}

关于apache-kafka - Kafka Connect REST接口(interface) "PUT/connectors/(string: name)/config"返回错误代码500,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53384144/

相关文章:

java - 并行 KafkaStream 处理的更好方法?

java - spring-kafka:如何将一个方法传递给另一个方法,该方法(根据设计)永远不会返回?

带有 Kafka 和 Zookeeper 的 Docker 镜像

apache-kafka - kafka connect的debezium源连接器无法生产到kafka怎么办

jdbc - Kafka JDBC Connect(源和接收器)和 Informix

apache-kafka - 如何从头开始使用Kafka Consumer API读取数据?

apache-kafka - kafka服务器可以强制消费属性吗?

apache-kafka - Kafka 连接 AWS Lambda 接收器

python - 将avro字节逻辑类型十进制反序列化为十进制

elasticsearch - 卡夫卡连接 : Creating a new connector in distributed mode is creating new group