csv - 追加json logstash elasticsearch数组

标签 csv elasticsearch logstash

我如何使用csv中的logstash在带有json对象的elasticsearch上附加数组

csv的例子

csv包含行

id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2

结果应该是2个文件
{
    "id": 1,
    [{
        "key1": "toto1",
        "key2": "toto2"
    }, {
        "key1": "titi1 ",
        "key2": "titi2"
    }]
}
,{
    "id": 2,
    [{
        "key1": "tata1",
        "key2": "tata2"
    }]
}

亲切地

最佳答案

首先,创建ES映射(如果需要),将内部对象声明为嵌套对象。

{
 "mappings": {
    "key_container": {
      "properties": {
        "id": {
          "type": "keyword",
          "index": true
        },
        "keys": {
          "type": "nested",
          "properties": {
            "key1": {
              "type": "keyword",
              "index": true
            },
            "key2": {
              "type": "text",
              "index": true
            }
          }
        }
      }
    }
  }
 }

keys属性将包含嵌套对象的数组。

比起使用logstash可以分两步加载csv:
  • 索引(创建)仅包含id属性
  • 的基础对象
  • 使用包含嵌套对象数组
  • 的keys属性更新基础对象

    第一个logstash配置(仅相关部分):
    filter {
        csv {
            columns => ["id","key1","key1"]
            separator => ","
            # Remove the keys because the will be loaded in the next hop with update
            remove_field => [ "key1", "key2"]
        }
        # Remove the row containing the column names
        if [id] == "id" {
            drop { }
        }
    }
    output {
        elasticsearch {
            action => "index"
            document_id => "%{id}"
            hosts => [ "localhost:9200" ]
            index => "key_container"
        }
    }
    

    第二步,配置logstash(您必须在elasticsearch中启用脚本):
    filter {
        csv {
            columns => ["id","key1","key2"]
            separator => ","
        }
        # Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
        mutate{
            rename => {
                "key1" => "[key][key1]"
                "key2" => "[key][key2]"
            }
        }
    }
    output {
        elasticsearch {
            action => "update"
            document_id => "%{id}"
            doc_as_upsert => "true"
            hosts => [ "localhost:9200" ]
            index => "key_container"
            script_lang => "groovy"
            # key_container.keys is an array of key objects
            # arrays can be built only with scripts and defined as an array when we put the first element into it
            script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
        }
    }
    

    总结,由于需要创建脚本的数组创建(只有更新才可用),因此需要两次加载。

    关于csv - 追加json logstash elasticsearch数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36127961/

    相关文章:

    r - 将R中的数据帧输出到.csv

    python - 如何从 CSV 模块使用 Python 迭代器

    elasticsearch - Elasticsearch 6.3在关键字字段中使用空格查询并且不返回所有文档

    elasticsearch - Filebeat无法将数据发送到logstash,导致elastic & kibana中的数据为空

    C++ : How to use SocketAppender of log4cplus to send logs to logstash server?

    Elasticsearch 重新索引

    python - 重新排序行数据

    python - 使用numpy在python中有条件地返回列

    Python elasticsearch-dsl 动态查询

    elasticsearch - URL编码Elasticsearch的cURL参数