json - Elastic Sink 中的 Kafka Connect 序列化错误

标签 json elasticsearch serialization apache-kafka apache-kafka-connect

我正在使用 kafka elasticsearch sink 连接器将传入消息传递给 ES,但我遇到了以下问题

[2018-10-05 13:01:21,388] ERROR WorkerSinkTask{id=elasticsearch.sink.direct-
10} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error:

Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"  "; line: 1, column: 2]
Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens
 at [Source: (byte[])"  "; line: 1, column: 2]

在 print.key 属性设置为 true 的情况下运行控制台使用者时,传入的键值消息如下所示
{
"schema": {
    "type": "struct",
    "fields": [{
            "type": "int32",
            "optional": false,
            "field": "MY_SETTING_ID"
        }
    ],
    "optional": false
},
"payload": {
    "MY_SETTING_ID": 9
}
}


{
    "schema": {
        "type": "struct",
        "fields": [{
                "type": "int32",
                "optional": false,
                "field": "MY_SETTING_ID"
            }, {
                "type": "string",
                "optional": true,
                "field": "MY_SETTING_NAME"
            }
        ],
        "optional": false
    },
    "payload": {
        "MY_SETTING_ID": 9,
        "MY_SETTING_NAME": "setting_name"
    }
}

在这里,MY_SETTING_ID 充当键。

我有以下独立属性文件
bootstrap.servers=dev-insight-kafka01:9092,dev-insight-kafka02:9092,dev-

insight-kafka03:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/apps/dev/logs/offsets/elasticsearch-direct.offsets
offset.flush.interval.ms=120000
rest.port=8099

plugin.path=/usr/share/java
producer.max.request.size = 10485760
consumer.auto.offset.reset=latest
consumer.session.timeout.ms=300000
consumer.request.timeout.ms=310000
flush.timeout.ms=160000
heartbeat.interval.ms= 60000
session.timeout.ms= 200000

和接收器 Prop 文件:
name=elasticsearch.sink.direct
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=16
topics=stream.app_setting

connection.url=http://dev-elastic-search01:9200
type.name=logs
topic.index.map=stream.app_setting:direct_app_setting_index
batch.size=2048
max.buffered.records=32768
flush.timeout.ms=60000
max.retries=10
retry.backoff.ms=1000
schema.ignore=true

如果有人可以查看我的属性(property)文件并告诉我哪里可能出错,我将不胜感激

最佳答案

由于您将架构作为 JSON 的一部分,因此您应该设置

key.converter.schemas.enable=true
value.converter.schemas.enable=true

关于json - Elastic Sink 中的 Kafka Connect 序列化错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52672427/

相关文章:

java - 解析 JSON 抛出异常

elasticsearch - 在 ElasticSearch 中聚合不同值

java - Spring Kafka JsonSerializer 用法

json - JSON 的编码/解码、编码/解码和序列化/反序列化之间的关系和区别?

c# - 将异常序列化为可抛出

json - 使用自定义 JSON BODY 进行 HTTP post 的最佳方式是什么

c# - 如何在不创建新类的情况下将简单的 JSON 对象发送到 ASP.NET?

php - 如果在其他表中为 Y,则从 SQL 中排除结果

c# - Elastic Search Nest-比较两个字段

javascript - Elasticsearch javascript 数组中的搜索字段