我正在将我的数据从kafka主题流式传输到elasticsearch中。但它从连接器 {\"type\":\"illegal_argument_exception\",\"reason\":\"对象映射 [search_data] 无法从嵌套更改为非嵌套\"} 引发此错误
但是当我从主题获取消息并使用elasticsearch api手动添加文档时,它工作正常。
kafka-connect-elasticsearch 不支持嵌套对象类型吗?
请帮我回复这个问题,因为我被困在这里好几天了。
Elasticsearch 版本:7.6.2
Kafka Connect 图片:confluenceinc/cp-kafka-connect:5.4.2
以下是我的连接器配置。
{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "es_sink_products",
"key.ignore": "false",
"schema.ignore": "true",
"connection.url": "localhost:9200",
"type.name": "kafka-connect",
"name": "product-elasticsearch-sink",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
Elasticsearch 架构
{
"mappings": {
"properties": {
"search_result_data": {
"properties": {
"product_id": {"type": "long"},
"product_name": {"type": "text"},
}
},
"search_data":{
"type": "nested",
"include_in_parent": false,
"properties": {
"product_id": {"type": "long"},
"full_text": {
"type": "text",
},
}
}
}
}
}
来自主题 es_sink_products
的示例消息
{
"search_data": {
"product_id": 1,
"full_text": "Product 1"
},
"search_result_data": {
"product_id": 1,
"product_name": "Product Name 1"
}
}
这是连接器的完整错误
“org.apache.kafka.connect.errors.ConnectException:由于不可恢复的异常而退出 WorkerSinkTask。\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org. apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache。 kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka。 connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run( FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n由: org.apache.kafka.connect.errors.ConnectException: 批量请求失败: [{\"type\":\"illegal_argument_exception\",\"原因\":\"对象映射 [search_data] 无法从嵌套更改为非嵌套\"}]\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.handleMalformedDoc(BulkProcessor. java:479)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:433)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call( BulkProcessor.java:389)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java: 266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java. lang.Thread.run(Thread.java:748)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370)\n"
最佳答案
当之前设置了非嵌套映射并且您尝试使用嵌套类型更新该映射时,会引发此错误。
你现在可以做的是:
- 删除索引
- 设置一次
嵌套
映射(上面所说的Elasticsearch Schema
) - 使用选项
"schema.ignore": "false"启动kafka流
原因:由于有效负载的索引方式,将非嵌套更改为嵌套
被视为“重大更改”。
关于java - ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62154639/