我如何使用csv中的logstash在带有json对象的elasticsearch上附加数组
csv的例子
csv包含行
id,key1,key2
1,toto1,toto2
1,titi1,titi2
2,tata1,tata2
结果应该是2个文件
{
"id": 1,
[{
"key1": "toto1",
"key2": "toto2"
}, {
"key1": "titi1 ",
"key2": "titi2"
}]
}
,{
"id": 2,
[{
"key1": "tata1",
"key2": "tata2"
}]
}
亲切地
最佳答案
首先,创建ES映射(如果需要),将内部对象声明为嵌套对象。
{
"mappings": {
"key_container": {
"properties": {
"id": {
"type": "keyword",
"index": true
},
"keys": {
"type": "nested",
"properties": {
"key1": {
"type": "keyword",
"index": true
},
"key2": {
"type": "text",
"index": true
}
}
}
}
}
}
}
keys属性将包含嵌套对象的数组。
比起使用logstash可以分两步加载csv:
第一个logstash配置(仅相关部分):
filter {
csv {
columns => ["id","key1","key1"]
separator => ","
# Remove the keys because the will be loaded in the next hop with update
remove_field => [ "key1", "key2"]
}
# Remove the row containing the column names
if [id] == "id" {
drop { }
}
}
output {
elasticsearch {
action => "index"
document_id => "%{id}"
hosts => [ "localhost:9200" ]
index => "key_container"
}
}
第二步,配置logstash(您必须在elasticsearch中启用脚本):
filter {
csv {
columns => ["id","key1","key2"]
separator => ","
}
# Convert the attributes into an object called 'key' that is passed to the script below (via the 'event' object)
mutate{
rename => {
"key1" => "[key][key1]"
"key2" => "[key][key2]"
}
}
}
output {
elasticsearch {
action => "update"
document_id => "%{id}"
doc_as_upsert => "true"
hosts => [ "localhost:9200" ]
index => "key_container"
script_lang => "groovy"
# key_container.keys is an array of key objects
# arrays can be built only with scripts and defined as an array when we put the first element into it
script => "if (ctx._source.containsKey('keys')) {ctx._source.keys += event.key} else {ctx._source.keys = [event.key]}"
}
}
总结,由于需要创建脚本的数组创建(只有更新才可用),因此需要两次加载。
关于csv - 追加json logstash elasticsearch数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36127961/