我有一个python服务器从外部服务器监听POST。我希望外部服务器上发生的每个incident
都有两个JSON文档。 JSON文档中的字段之一是unique_key
,可用于标识这两个文档属于同一文档。接收到JSON文档后,我的python服务器便加入了elasticsearch。与事件相关的两个文档将在 Elasticsearch 中进行如下索引。/my_index/doc_type/doc_1
/my_index/doc_type/doc_2
即文档属于相同的索引并且具有相同的文档类型。但是我没有一个容易的方法来知道这两个文档是相关的。当我可以在两个文档上使用unique_key
链接这两个文档时,我想在插入ElasticSearch之前进行一些处理。您对这两个文档进行一些标准化并将它们合并到一个JSON文档中有什么想法。必须记住,我每秒将收到大量此类文件。我需要一些临时存储来存储和处理JSON文档。有人可以为解决这个问题提供一些建议吗?
作为更新,我在这里添加JSON文件的基本结构。json_1
{
"msg": "0",
"tdxy": "1",
"data": {
"Metric": "true",
"Severity": "warn",
"Message": {
"Session": "None",
"TransId": "myserver.com-14d9e013794",
"TransName": "dashboard.action",
"Time": 0,
"Code": 0,
"CPUs": 8,
"Lang": "en-GB",
"Event": "false",
},
"EventTimestamp": "1433192761097"
},
"Timestamp": "1433732801097",
"Host": "myserver.myspace.com",
"Group": "UndefinedGroup"
}
json_2
{
"Message": "Hello World",
"Session": "4B5ABE9B135B7EHD49343865C83AD9E079",
"TransId": "myserver.com-14d9e013794",
"TransName": "dashboard.action"
"points": [
{
"Name": "service.myserver.com:9065",
"Host": "myserver.com",
"Port": "9065",
}
],
"Points Operations": 1,
"Points Exceeded": 0,
"HEADER.connection": "Keep-Alive",
"PARAMETER._": "1432875392706",
}
我已经按照建议更新了代码。
if rx_buffer:
txid = json.loads(rx_buffer)['TransId']
if `condition_1`:
res = es.index(index='its', doc_type='vents', id=txid, body=rx_buffer)
print(res['created'])
elif `condition_2`:
res = es.update(index='its', doc_type='vents', id=txid, body={"f_vent":{"b_vent":rx_buffer}})
我收到以下错误。
File "/usr/lib/python2.7/site-packages/elasticsearch/transport.py", line 307, in perform_request
status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
File "/usr/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 89, in perform_request
self._raise_error(response.status, raw_data)
File "/usr/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 105, in _raise_error
raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
RequestError: TransportError(400, u'ActionRequestValidationException[Validation Failed: 1: script or doc is missing;]')
最佳答案
下面的代码假定您使用的是官方elasticsearch-py库,但是将代码轻松转换到另一个库很容易。
我们可能还需要为您的doc_type
类型的组合文档创建一个特定的映射,但这在很大程度上取决于您以后如何查询它。
无论如何,基于上面的讨论,我将首先索引json1
from elasticsearch import Elasticsearch
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])
json1 = { ...JSON of the first document you've received... }
// extract the unique ID
// note: you might want to only take 14d9e013794 and ditch "myserver.com-" if that prefix is always constant
doc_id = json1['data']['Message']['TransID']
// index the first document
es_client.index(index="my_index", doc_type="doc_type", id=doc_id, body=json1)
此时,
json1
已存储在Elasticsearch中。然后,当您稍后获得第二个文档json2
时,可以像这样继续:json2 = { ...JSON of the first document you've received... }
// extract the unique ID
// note: same remark about keeping only the second part of the id
doc_id = json2['TransID']
// make a partial update of your first document
es_client.update(index="my_index", doc_type="doc_type", id=doc_id, body={"doc": {"SecondDoc": json2}})
请注意,
SecondDoc
可以是您在此处选择的任何名称,它只是一个嵌套字段,将包含您的第二个文档。此时,您应该具有一个ID为
14d9e013794
和以下内容的文档:{
"msg": "0",
"tdxy": "1",
"data": {
"Metric": "true",
"Severity": "warn",
"Message": {
"Session": "None",
"TransId": "myserver.com-14d9e013794",
"TransName": "dashboard.action",
"Time": 0,
"Code": 0,
"CPUs": 8,
"Lang": "en-GB",
"Event": "false"
},
"EventTimestamp": "1433192761097"
},
"Timestamp": "1433732801097",
"Host": "myserver.myspace.com",
"Group": "UndefinedGroup",
"SecondDoc": {
"Message": "Hello World",
"Session": "4B5ABE9B135B7EHD49343865C83AD9E079",
"TransId": "myserver.com-14d9e013794",
"TransName": "dashboard.action",
"points": [
{
"Name": "service.myserver.com:9065",
"Host": "myserver.com",
"Port": "9065"
}
],
"Points Operations": 1,
"Points Exceeded": 0,
"HEADER.connection": "Keep-Alive",
"PARAMETER._": "1432875392706"
}
}
当然,您可以在索引/更新它们之前对
json1
和json2
进行任何处理。
关于python - 通过 key 链接的两个JSON文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30544973/