elasticsearch - 尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题

标签 elasticsearch apache-kafka apache-kafka-connect

我想将 avro 中的 kafka 主题索引为 elasticsearch 格式,但是 我的时间戳字段无法被识别 elasticsearch 作为日期格式字段。

我对连接器使用了以下配置。

   {
          "name": "es-sink-barchart-10",
      "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "value.converter.schema.registry.url": "http://localhost:8081",

        "connection.url": "http://localhost:9200",

        "type.name":"type.name=kafka-connect",

        "topics": "exchange_avro_01",

        "topic.index.map": "exchange_avro_01:exchange_barchart",

        "key.ignore": "true"
     }
    }

原始字段是bigint类型,我希望目标字段是日期类型,具有elasticsearch的任何有效格式。我定义了一个动态模板来尝试通过以下方式解决它:

curl -XPUT "http://localhost:9200/_template/kafkaconnect/" -H 'Content-Type: application/json' -d'
{
  "index_patterns": "exchange*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "kafka-connect": {
      "dynamic_templates": [
    {
          "dates": {
        "match_mapping_type": "long",
            "match": "TIME",
            "mapping": {
              "type": "date",
          "format": "yyyy-MM-dd HH:mm:ss"
            }
          }
        }
      ]
     ,
      "properties": {
          "CLOSE": {
            "type": "double"
          },
         .
         .
         .
        }
      }

    }
  }
}'

当我加载上述连接器时,没有任何内容被索引到elasticsearch。

有什么帮助吗?

最佳答案

如果你的源是一个bigint,那么它可能是一个纪元。如果它是一个纪元,那么这将不起作用:

"mapping": {
      "type": "date",
      "format": "yyyy-MM-dd HH:mm:ss"
        }

因为您告诉 Elasticsearch 日期格式为 yyyy-MM-dd HH:mm:ss(事实并非如此)。

因此,请尝试此操作(暂时忽略您的自定义映射;首先使其正常工作,然后将其添加回来):

{
  "index_patterns": "exchange*",
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "kafka-connect": {
      "dynamic_templates": [
        {
          "dates": {
            "match": "TIME",
            "mapping": {
              "type": "date"
            } } } ] } } }

另请引用:https://www.elastic.co/guide/en/elasticsearch/reference/current/dynamic-field-mapping.html#date-detection

nothing is indexed to elasticsearch.

检查 Kafka Connect 工作日志和 Elasticsearch 日志是否有任何错误。

关于elasticsearch - 尝试使用 Kafka Connect 在 Elasticsearch 中索引 kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53323383/

相关文章:

python - Elasticsearch:具有多个字段的单个 "more-like-this"查询与具有单个字段的多个 "more-like-this"查询

java - 如何在Spark中将JavaPairInputDStream转换为DataSet/DataFrame

java - 启用 SSL 后 Kafka Connect 超出 Java 堆空间

docker - kafka-connect-jdbc源连接器OOM

python - 使用 python 在 Elasticsearch 中进行身份验证

ruby-on-rails-4 - 嵌套排序无法按预期工作 ElasticSearch

java - 我们如何使用 API 从 IDE 在 Kafka 中创建主题

java - Kafka Streams 表转换

elasticsearch - Elasticsearch “keep_types”过滤器不适用于 “pattern” token 生成器

security - 加密 Kafka 中的数据?