我在 Elasticsearch (+Kibana) 中有值,想制作一个图表,其中连接了某些节点。
我的字段是“prev”和“curr”,表示用户访问的“上一个”和“当前”页面。
例如:
- 上一个:主页,当前:Donald_Trump
- 上一个:其他内部,当前:El_Bienamado
- ...
所以我正在尝试做的是搜索值,当前值等于先前值,以便能够连接这些值并通过 Kibana 中的 Networkx-Graph 进行可视化。
我的问题是我昨天才开始使用查询语法,不知道这是否可行。 总而言之,我的目标是制作一个图,其中节点连接到一条链,例如:
- 主页 -> Donald_Trump -> Problems_in_Afrika -> 等
表示有人按特定顺序访问了这些页面。
我现在尝试的是:
def getPrevList():
previous = []
previousQuery = {
"size": 0,
"aggs": {
"topTerms": {
"terms": {
"field": "prev",
"size": 50000
}
}
}
}
results = es.search(index="wiki", body=previousQuery)["aggregations"]["topTerms"]["buckets"]
for bucket in results:
previous.append({
"prev" : bucket["key"],
"numDocs" : bucket["doc_count"]
})
return previous
prevs=getPrevList()
rowNum = 0;
totalNumReviews=0
for prevDetails in prevs:
rowNum += 1
totalNumDocs += prevDetails["numDocs"]
prevId = prevDetails["prev"]
q = {
"query": {
"bool": {
"must": [
{
"term": {"prev": prevId}
}
]
}
},
"controls": {
"sample_size": 10000,
"use_significance": True
},
"vertices": [
{
"field": "curr",
"size": VERTEX_SIZE,
"min_doc_count": 1
},
{
"field": "prev",
"size": VERTEX_SIZE,
"min_doc_count": 1
}
],
"connections": {
"query": {
"match_all": {}
}
}
}
最后,我将执行以下操作:
results = es.transport.perform_request('POST', "/wiki/_xpack/_graph/_explore", body=q)
# Use NetworkX to create a graph of prevs and currs we can analyze
G = nx.Graph()
for node in results["vertices"]:
G.add_node(nodeId(node), type=node["field"])
for edge in results["connections"]:
n1 = results["vertices"][int(edge["source"])]
n2 = results["vertices"][int(edge["target"])]
G.add_edge(nodeId(n1), nodeId(n2))
我从另一个例子中复制了它,效果很好,但我可以看到“连接”对于连接顶点很重要。
据我所知,我需要查询才能找到正确的“prev”字段。 目前控制并不重要。 对我来说复杂的部分来了:我在顶点和连接部分写了什么?我将顶点定义为 prev 和 curr 字段是否正确? 在连接查询中:现在我定义了“match_all”,但这显然是不正确的。我需要一个查询,在那里我可以“匹配”那些,其中 prev 等于 curr 并连接它们..但是如何??
感谢任何提示! 感谢转发。
编辑:
就像@Lupanoide 建议的那样,我修改了代码,现在有两个可视化效果: 第一个是第一个建议的解决方案,它给了我这张图(其中的一部分)(matplotlib,还不是 Kibana):
第二个解决方案看起来更疯狂并且更有可能是正确的,但我需要先在 Kibana 中将其可视化:
现在我的脚本的新结尾是:
gq = json.dumps(q)
workspaceID ="/f44c95c0-223d-11e9-b49e-bb0f8e1e7bae" # my v6.4.0 workspace
workspaceUrl = "graph#/workspace/"+workspaceID+"?query=" + urllib.quote_plus(gq)
doc = {
"url": workspaceUrl
}
res = es.index(index=connectionsIndexName, doc_type='task', id=0, body=doc)
我现在唯一的问题是,当我使用 Kibana 打开 URL 时,我看不到图表。相反,我得到了“新图表”页面。
编辑2 好的,我发送查询,但当然仅查询是不够的。我需要传递图形及其连接,对吗?可能吗?
非常感谢!
最佳答案
编辑:
对于您的用例,您需要找到具有相同 prev
值的字段 curr
的所有值。所以需要对某个页面之后点击的所有页面进行groupBy。你可以用 terms aggregation 做到这一点.
您需要构建一个查询,一方面通过术语聚合返回 prev
字段的所有值,然后针对生成的所有 curr
值进行聚合:
def getOccurrencyDict():
body = {
"size": 0,
"aggs": {
"getAllThePrevs": {
"terms": {
"field": "prev",
"size": 40000
},
"aggs": {
"getAllTheCurr": {
"terms": {
"field": "curr",
"size": 40000
}
}
}
}
}
}
result = es.search(index="my_index", doc_type="mydoctype", body=body)
然后您必须构建Networkx
库的class Graph()
接受的数据结构。所以你应该构建一个列表字典,然后将该 var 传递给 fromdictoflist方法:
dict2Graph = dict()
for res in result["aggregations"]["getAllThePrevs"]["buckets"]:
dict2Graph[ res["key"] ] = list() #you create a dict of list with a prev value key
dict2Graph[ res["key"] ].append(res["getAllTheCurr"]["buckets"]) # you append a list of dict composed by key `key` with the `curr` value, and key `doc_count` with the number of occurrence of the term `curr` before the term prev
现在将其传递给 networkx 摄取方法:
G=nx.from_dict_of_lists(dict2Graph)
我没有测试 networkx 摄取,所以如果它不起作用,那是因为我们在其中传递了一个字典列表而不是列表字典,所以你应该稍微改变一下你的构建方式dict2Graph
字典
如果聚合查询聚合太慢你应该使用prtition。请read here如何在弹性中实现分区聚合
编辑:
在阅读了 networkX 文档之后,您也可以这样做,而无需创建中间数据结构:
from elasticsearch import Elasticsearch
from elasticsearch.client.graph import GraphClient
es = Elasticsearch()
graph_client = GraphClient(es)
def createGraphInKibana(prev):
q = {
"query": {
"bool": {
"must": [
{
"term": {"prev": prev}
}
]
}
},
"controls": {
"sample_size": 10000,
"use_significance": True
},
"vertices": [
{
"field": "curr",
"size": VERTEX_SIZE,
"min_doc_count": 1
},
{
"field": "prev",
"size": VERTEX_SIZE,
"min_doc_count": 1
}
],
"connections": {
"query": {
"match_all": {}
}
}
}
graph_client.explore(index="your_index", doc_type="your_doc_type", body=q)
G = nx.Graph()
for prev in result["aggregations"]["getAllThePrevs"]["buckets"]:
createGraphInKibana(prev['key'])
for curr in prev["getAllTheCurr"]["buckets"]:
G.add_edge(prev["key"], curr["key"], weight=curr["doc_count"])
关于python - 如何通过 Python 查询在 Elasticsearch 中的不同字段中找到相等的值?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54387027/