json - Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

标签 json jdbc apache-kafka apache-kafka-connect confluent-platform

Kafka JDBC Sink Connector 对于具有可选字段“parentId”的模式的消息给出空指针异常。我错过了什么吗?我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector

关于 Kafka 主题的消息是

{
"schema":{
  "type":"struct",
  "fields":[
     {
        "field":"id",
        "type":"string"
     },
     {
        "field":"type",
        "type":"string"
     },
     {
        "field":"eventId",
        "type":"string"
     },
     {
        "field":"parentId",
        "type":"string",
        "optional":true
     },
     {
        "field":"created",
        "type":"int64",
        "name":"org.apache.kafka.connect.data.Timestamp",
        "version":1
     }
  ]
 },
 "payload":{
   "id":"asset-1",
   "type":"parcel",
   "eventId":"evt-1",
   "created":1501834166000
 }
}

连接器具有这些属性

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
connection.password=admin
topics=asset-topic
tasks.max=1
batch.size=1
auto.evolve=true
connection.user=admin
auto.create=true
connection.url=jdbc:postgresql://postgres-db:5432/fabricdb
value.converter=org.apache.kafka.connect.json.JsonConverter
pk.mode=record_value
pk.fields=id

但是 JDBC Sink Connector 因可选字段parentId 失败

        org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:491)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: java.lang.NullPointerException
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:698)
        at org.apache.kafka.connect.json.JsonConverter.access$000(JsonConverter.java:61)
        at org.apache.kafka.connect.json.JsonConverter$12.convert(JsonConverter.java:181)
        at org.apache.kafka.connect.json.JsonConverter.convertToConnect(JsonConverter.java:742)
        at org.apache.kafka.connect.json.JsonConverter.jsonToConnect(JsonConverter.java:361)
        at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:350)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:514)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
        ... 13 more

最佳答案

根据JsonConverter字段值的源代码,标记为可选的字段必须位于消息负载中。

您可以在JsonConverter类中找到将jsonValue转换为Object的方法`

    private static Object convertToConnect(Schema schema, JsonNode jsonValue) {
    final Schema.Type schemaType;
    if (schema != null) {
        schemaType = schema.type();
        if (jsonValue.isNull()) {
            if (schema.defaultValue() != null)
                return schema.defaultValue(); // any logical type conversions should already have been applied
            if (schema.isOptional())
                return null;
            throw new DataException("Invalid null value for required " + schemaType +  " field");
        }
    }

如果您的案例中存在架构,则摘要为:

 {
    "field":"parentId",
    "type":"string",
    "optional":true
 }

值必须位于消息负载中。它可以是null,但它必须是。

如果您查看其他站点的代码。代码,负责序列化。它添加了 NullNode 作为空引用。

    private static JsonNode convertToJson(Schema schema, Object logicalValue) {
    if (logicalValue == null) {
        if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema
            return null;
        if (schema.defaultValue() != null)
            return convertToJson(schema, schema.defaultValue());
        if (schema.isOptional())
            return JsonNodeFactory.instance.nullNode();
        throw new DataException("Conversion error: null value for field that is required and has no default value");
    }

关于json - Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54139986/

相关文章:

Python从url读取json

java - 使用JDBC访问远程MySQL数据库时出现com.mysql.jdbc.exceptions.jdbc4.CommunicationsException

spring - 使用消息系统的 2 个应用程序之间的交互

scala - Alpakka Consumer 不消费来自通过 Docker 运行的 Kafka 的消息

apache-kafka - 将协调员标记为组死亡(Kafka)

java - 将 JSON 值插入字符串

javascript - 检测浏览器加载的 JSON

json - 字符集编码 Tomcat Windows/JBoss Linux

java - 我有哪些选择来存储和查询大量重复的数据?

MySQLNonTransientConnectionException/jdbc4.CommunicationsException 错误在使用 C3P0 时继续