在我当前的ELK中,我正在获取日志,如果用户登录失败,我将得到“用户X失败”,而当用户成功登录时,我将获得“用户X成功”。 X也存储在单独的标签中(logged_user)。我想在给定的时间内找出谁是登录失败但没有与他们相关联的成功日志的用户
例如:用户A和用户B登录失败,但用户B成功登录。因此,我应该只在查询结果中看到报告了用户x。
有没有一种方法可以通过ELK / Kibana进行相同的处理,以便可以对其进行可视化显示。
最佳答案
elasticsearch中没有联接,因此您无法将索引与其自身联接并比较值。您的要求应该使用脚本化的指标聚合来解决,或者最好在客户端进行(获取该范围内的所有记录并对其进行过滤)。
我正在使用scripted metric aggregation创建具有唯一的feedbackIdentifierI和反馈值的字典。
Executed prior to any collection of documents. Allows the aggregation to set up any initial state.
已声明哈希表交易”
Executed once per document collected Loop through all document and add unique logged_user and corresponding timestamp and message value to dictionary. I am getting last entry of that user in given time range
Executed once on each shard after document collection is complete
返回所有碎片的字典
Executed once on the coordinating node after all shards have returned their results
再次遍历每个分片返回的所有字典,并创建一个唯一的字典。
数据:
"hits" : [
{
"_index" : "index31",
"_type" : "_doc",
"_id" : "4W2uC3IBFQk2sliOGJWu",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-05-13",
"logged_user" : "Abc",
"message" : "User Abc logged successful"
}
},
{
"_index" : "index31",
"_type" : "_doc",
"_id" : "4m2uC3IBFQk2sliOj5Wg",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-05-12",
"logged_user" : "Xyz",
"message" : "User Xyz loggin failed"
}
},
{
"_index" : "index31",
"_type" : "_doc",
"_id" : "422uC3IBFQk2sliOt5US",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-05-13",
"logged_user" : "Xyz",
"message" : "User Xyz loggin successful"
}
},
{
"_index" : "index31",
"_type" : "_doc",
"_id" : "5G3CC3IBFQk2sliO9ZW0",
"_score" : 1.0,
"_source" : {
"timestamp" : "2020-05-12",
"logged_user" : "Abc",
"message" : "User Abc loggin failed"
}
}
]
查询:
GET index31/_search
{
"query": {
"range": {
"timestamp": {
"gte": "2020-05-12",
"lte": "2020-05-13"
}
}
},
"aggs": {
"login_failed": {
"scripted_metric": {
"init_script": "state.transactions =new Hashtable();",
"map_script": """ def message="";
if(doc['message.keyword'].value.indexOf('failed')>-1)
{
message="failed";
}else{
message="successful";
}
if(state.transactions.get(doc['logged_user.keyword'])==null)
{
state.transactions.put(doc['logged_user.keyword'].value,[ doc.timestamp,message])
}else{
def user= state.transactions.get(doc['logged_user.keyword'].value);
if(user[0]<doc.timestamp)
{
user[0]= doc.timestamp;
user[1]=message;
}
}
""",
"combine_script": "return state.transactions",
"reduce_script": """
def loggins=new Hashtable();
for(a in states){
for(entry in a.entrySet()){
if(loggins.get(entry.getKey())==null){
loggins.put(entry.getKey(),[entry.getValue()[0],entry.getValue()[1]]);
}else{
def user= loggins.get(entry.getKey());
if(user[0]<entry.getValue())
{
user[0]= entry.getValue()[0];
user[1]= entry.getValue()[1];
}
}
}
}
def failed_loggins=new Hashtable();
for(entry in loggins.entrySet()){
if(entry.getValue()[1] =='failed')
{
failed_loggins.put(entry.getKey(),[entry.getValue()[0],entry.getValue()[1]]);
}
}
return failed_loggins;"""
}
}
}
}
Kabana可视化中不支持Scripted_metric聚合,因此,如果您尝试在kibana中进行可视化,则上述方法将不起作用(仅开发工具将支持它)
结果:
"aggregations" : {
"login_failed" : {
"value" : {
"Abc" : [
[
"2020-05-12T00:00:00.000Z"
],
"failed"
]
}
}
}
关于elasticsearch - 在ElasticSearch中有一个子查询类型搜索,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61754045/