java - ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套

标签 java elasticsearch apache-kafka apache-kafka-connect

我正在将我的数据从kafka主题流式传输到elasticsearch中。但它从连接器 {\"type\":\"illegal_argument_exception\",\"reason\":\"对象映射 [search_data] 无法从嵌套更改为非嵌套\"} 引发此错误

但是当我从主题获取消息并使用elasticsearch api手动添加文档时,它工作正常。

kafka-connect-elasticsearch 不支持嵌套对象类型吗?

请帮我回复这个问题,因为我被困在这里好几天了。

Elasticsearch 版本:7.6.2

Kafka Connect 图片:confluenceinc/cp-kafka-connect:5.4.2

以下是我的连接器配置。

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "es_sink_products",
    "key.ignore": "false",
    "schema.ignore": "true",
    "connection.url": "localhost:9200",
    "type.name": "kafka-connect",
    "name": "product-elasticsearch-sink",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

Elasticsearch 架构

{
  "mappings": {
    "properties": {
      "search_result_data": {
        "properties": {
          "product_id": {"type": "long"},
          "product_name": {"type": "text"},
        }
      },
      "search_data":{
        "type": "nested",
        "include_in_parent": false,
        "properties": {
          "product_id": {"type": "long"},
          "full_text": {
            "type": "text",
          },
        }
      }
    }
  }
}

来自主题 es_sink_products 的示例消息

{
    "search_data": {
        "product_id": 1,
        "full_text": "Product 1"
    },
    "search_result_data": {
        "product_id": 1,
        "product_name": "Product Name 1"
    }
}

这是连接器的完整错误 “org.apache.kafka.connect.errors.ConnectException:由于不可恢复的异常而退出 WorkerSinkTask。\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)\n\tat org. apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)\n\tat org.apache。 kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)\n\tat org.apache.kafka。 connect.runtime.WorkerTask.run(WorkerTask.java:227)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run( FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n由: org.apache.kafka.connect.errors.ConnectException: 批量请求失败: [{\"type\":\"illegal_argument_exception\",\"原因\":\"对象映射 [search_data] 无法从嵌套更改为非嵌套\"}]\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.handleMalformedDoc(BulkProcessor. java:479)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:433)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call( BulkProcessor.java:389)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java: 266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java. lang.Thread.run(Thread.java:748)\n\tat io.confluence.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370)\n"

最佳答案

当之前设置了非嵌套映射并且您尝试使用嵌套类型更新该映射时,会引发此错误。

你现在可以做的是:

  1. 删除索引
  2. 设置一次嵌套映射(上面所说的Elasticsearch Schema)
  3. 使用选项"schema.ignore": "false"启动kafka流

原因:由于有效负载的索引方式,将非嵌套更改为嵌套被视为“重大更改”。

关于java - ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62154639/

相关文章:

java - 在映射器之间共享 FSDataInputStream?

java - 使用 xtext 和 maven 解决构建问题

node.js - ElasticSearch - 搜索精确的文本匹配而不在索引中保留两个副本?

elasticsearch - 在Elastic搜索中基于定界符对字符串进行标记

apache-kafka - 如何将一个 Kafka 主题拆分为多个较小的 Kafka 主题?

apache-kafka - 与 2.10 包融合构建包含 scala 2.11 jar

java - 在 API 中使用 java.util.Date 的充分理由

java - 在 Maven 存储库中查找 Oracle JDBC 驱动程序

php - elasticsearch用php和curl更新文档

apache-kafka - kafka ack=all 和 min-isr