elasticsearch - 通过在另一个索引中找到的时间戳来过滤/聚合时间序列数据的一个elasticsearch索引

标签 elasticsearch time-series

数据

所以我有大量不同类型的时间序列数据。目前我选择将每种类型的数据放入自己的索引中,因为除了 4 个字段之外,所有数据都非常不同。此外,数据以不同的速率采样,并且不能保证在同一亚秒窗口内具有共同的时间戳,因此将它们全部融合到一个大文档中也不是一项简单的任务。

目标

我试图看看是否可以在 Elasticsearch 中完全解决的一个常见用例是根据另一个索引的查询返回的时间窗口返回一个索引的聚合结果。如图:

What i'd like to accomplish

这就是我想要实现的目标。

一些注意事项

对于“条件”数据上足够小的信号转换,我可以只使用日期直方图和顶部命中子聚合的某种组合,但是当我出现 10,000 或 100,000 次时,这种情况很快就会崩溃“条件”。此外,这只是一个“案例”,我有 100 组类似的情况,我想从中获得总体最小/最大。

这些比较基本上是在我认为的兄弟级别的文档或索引之间进行的,因此似乎没有任何明显的父子关系在很长一段时间内足够灵活运行,至少以当前数据的结构方式运行。

感觉应该有一个优雅的解决方案,而不是用一个查询的结果在 Elasticsearch 之外强力构建日期范围,并将 100 个时间范围输入到另一个查询中。

浏览文档,感觉 Elasticsearch 脚本和一些管道聚合的某种组合将是我想要的,但我并没有想到明确的解决方案。我确实可以使用社区中一些正确方向的指导。

谢谢。

最佳答案

我找到了一个适合我解决这个问题的“解决方案”。还没有任何人的答案,甚至没有评论,但我会发布我的解决方案,以防其他人来寻找类似的东西。我确信有很多改进和优化的机会,如果我发现这样的解决方案(可能通过脚本聚合),我会回来更新我的解决方案。

这可能不是最佳解决方案,但它对我有用。关键是利用 top_hitsserial_diffbucket_selector 聚合器。

“解决方案”

def time_edges(index, must_terms=[], should_terms=[], filter_terms=[], data_sample_accuracy_window=200):
    """
    Find the affected flights and date ranges where a specific set of terms occurs in a particular ES index.

    index: the Elasticsearch index to search
    terms: a list of dictionaries of form { "term": { "<termname>": <value>}}
    """
    query = {
        "size": 0,
        "timeout": "5s",
        "query": {
            "constant_score": {
                "filter": {
                    "bool": {
                        "must": must_terms,
                        "should": should_terms,
                        "filter": filter_terms
                    }
                }
            }
        },
        "aggs": {
            "by_flight_id": {
                "terms": {"field": "flight_id", "size": 1000},
                "aggs": {
                    "last": {
                        "top_hits": {
                            "sort": [{"@timestamp": {"order": "desc"}}],
                            "size": 1,
                            "script_fields": {
                                "timestamp": {
                                    "script": "doc['@timestamp'].value"
                                }
                            }
                        }
                    },
                    "first": {
                        "top_hits": {
                            "sort": [{"@timestamp": {"order": "asc"}}],
                            "size": 1,
                            "script_fields": {
                                "timestamp": {
                                    "script": "doc['@timestamp'].value"
                                }
                            }
                        }
                    },
                    "time_edges": {
                        "histogram": {
                            "min_doc_count": 1,
                            "interval": 1,
                            "script": {
                                "inline": "doc['@timestamp'].value",
                                "lang": "painless",
                            }
                        },
                        "aggs": {
                            "timestamps": {
                                "max": {"field": "@timestamp"}
                            },
                            "timestamp_diff": {
                                "serial_diff": {
                                    "buckets_path": "timestamps",
                                    "lag": 1
                                }
                            },
                            "time_delta_filter": {
                                "bucket_selector": {
                                    "buckets_path": {
                                        "timestampDiff": "timestamp_diff"
                                    },
                                    "script": "if (params != null && params.timestampDiff != null) { params.timestampDiff > " + str(data_sample_accuracy_window) + "} else { false }"
                                }
                            }
                        }
                    }
                }
            }

        }
    }

    return es.search(index=index, body=query)

分解事情

获取按“索引 2”过滤结果

the condition

    "query": {
        "constant_score": {
            "filter": {
                "bool": {
                    "must": must_terms,
                    "should": should_terms,
                    "filter": filter_terms
                }
            }
        }
    },

must_terms 是能够获取“索引 2”中存储的“条件”的所有结果所需的值。

例如,要将结果限制为仅过去 10 天,并且当 condition 为值 10 或 12 时,我们添加以下 must_terms

must_terms = [
    {
        "range": {
            "@timestamp": {
                "gte": "now-10d",
                "lte": "now"
            }
        }
    },
    {
        "terms": {"condition": [10, 12]}
    }
]

这会返回一组精简的文档,然后我们可以将其传递到聚合中以找出“样本”所在的位置。

聚合

对于我的用例,我们的飞机有“航类”的概念,因此我想按返回结果的 id 对返回结果进行分组,然后将所有发生的事件“分解”到存储桶中。

    "aggs": {
        "by_flight_id": {
            "terms": {"field": "flight_id", "size": 1000},


            ...


            }
        }

    }

您可以使用top_hits聚合获取第一次出现的上升沿和最后一次出现的下降沿

top_hits

    "last": {
        "top_hits": {
            "sort": [{"@timestamp": {"order": "desc"}}],
            "size": 1,
            "script_fields": {
                "timestamp": {
                    "script": "doc['@timestamp'].value"
                }
            }
        }
    },
    "first": {
        "top_hits": {
            "sort": [{"@timestamp": {"order": "asc"}}],
            "size": 1,
            "script_fields": {
                "timestamp": {
                    "script": "doc['@timestamp'].value"
                }
            }
        }
    },

您可以使用时间戳上的直方图来获取中间的样本。这会将返回的结果分成每个唯一时间戳的存储桶。这是一个成本高昂的聚合,但值得。使用内联脚本允许我们使用存储桶名称的时间戳值。

time_edges

    "time_edges": {
        "histogram": {
            "min_doc_count": 1,
            "interval": 1,
            "script": {
                "inline": "doc['@timestamp'].value",
                "lang": "painless",
            }
        },

        ...

    }

默认情况下,直方图聚合返回一组存储桶,其中包含每个存储桶的文档计数,但我们需要一个。这是 serial_diff 聚合工作所需的,因此我们必须对结果进行标记 max 聚合以获得返回的值。

    "aggs": {
        "timestamps": {
            "max": {"field": "@timestamp"}
        },
        "timestamp_diff": {
            "serial_diff": {
                "buckets_path": "timestamps",
                "lag": 1
            }
        },

        ...

    }

我们使用serial_diff的结果来确定两个桶是否近似相邻。然后,我们丢弃彼此相邻的样本,并使用 bucket_selector 聚合为我们的条件创建一个组合时间范围。这将丢弃小于我们的 data_sample_accuracy_window 的存储桶。该值取决于您的数据集。

    "aggs": {

        ...

        "time_delta_filter": {
            "bucket_selector": {
                "buckets_path": {
                    "timestampDiff": "timestamp_diff"
                },
                "script": "if (params != null && params.timestampDiff != null) { params.timestampDiff > " + str(data_sample_accuracy_window) + "} else { false }"
            }
        }
    }

serial_diff 结果对于我们确定条件设置的时间也至关重要。我们的桶的时间戳最终代表我们的条件信号的“上升”边缘,因此如果不进行一些后处理,下降边缘是未知的。我们使用 timestampDiff 值来确定下降沿在哪里。

关于elasticsearch - 通过在另一个索引中找到的时间戳来过滤/聚合时间序列数据的一个elasticsearch索引,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40249803/

相关文章:

python - 如何将以小时(h)为索引单位的pandas时间序列转换为pandas日期时间格式?

python - 在Python/Pandas中为每次多条记录的时间序列数据添加新的 'step'值列

elasticsearch - 使用Elasticsearch进行查询时遇到麻烦

macos - 如何在Mac上创建本地Elasticsearch集群

c# - 带有术语汇总的Elasticsearch日期直方图报告

python - 时间序列分析:How to plot these AR(1) graphs in python?

scala - 倾斜的窗口函数和 Hive 源分区?

elasticsearch - Rescore是否支持嵌套查询?

elasticsearch: cluster_block_exception TOO_MANY_REQUESTS/12/index 只读/允许删除 (api)

python - 如何计算 Pandas DataFrame 上的滚动累积乘积