python - 通过 key 链接的两个JSON文档

标签 python json elasticsearch

我有一个python服务器从外部服务器监听POST。我希望外部服务器上发生的每个incident都有两个JSON文档。 JSON文档中的字段之一是unique_key,可用于标识这两个文档属于同一文档。接收到JSON文档后,我的python服务器便加入了elasticsearch。与事件相关的两个文档将在 Elasticsearch 中进行如下索引。
/my_index/doc_type/doc_1/my_index/doc_type/doc_2
即文档属于相同的索引并且具有相同的文档类型。但是我没有一个容易的方法来知道这两个文档是相关的。当我可以在两个文档上使用unique_key链接这两个文档时,我想在插入ElasticSearch之前进行一些处理。您对这两个文档进行一些标准化并将它们合并到一个JSON文档中有什么想法。必须记住,我每秒将收到大量此类文件。我需要一些临时存储来存储和处理JSON文档。有人可以为解决这个问题提供一些建议吗?

作为更新,我在这里添加JSON文件的基本结构。
json_1

{
    "msg": "0",
    "tdxy": "1",
    "data": {

        "Metric": "true",
        "Severity": "warn",

        "Message": {
            "Session": "None",
            "TransId": "myserver.com-14d9e013794",
            "TransName": "dashboard.action",
            "Time": 0,
            "Code": 0,
            "CPUs": 8,
            "Lang": "en-GB",
            "Event": "false",
        },
        "EventTimestamp": "1433192761097"
    },
    "Timestamp": "1433732801097",
    "Host": "myserver.myspace.com",
    "Group": "UndefinedGroup"
}
json_2
{
    "Message": "Hello World",
    "Session": "4B5ABE9B135B7EHD49343865C83AD9E079",
    "TransId": "myserver.com-14d9e013794",  
    "TransName": "dashboard.action"
    "points": [
        {
            "Name": "service.myserver.com:9065",
            "Host": "myserver.com",
            "Port": "9065",

        }
    ],
    "Points Operations": 1,
    "Points Exceeded": 0,
    "HEADER.connection": "Keep-Alive",
    "PARAMETER._": "1432875392706",
}

我已经按照建议更新了代码。
      if rx_buffer:
           txid = json.loads(rx_buffer)['TransId']
            if `condition_1`: 
                res = es.index(index='its', doc_type='vents', id=txid, body=rx_buffer)
                print(res['created'])
            elif `condition_2`:
                res = es.update(index='its', doc_type='vents', id=txid, body={"f_vent":{"b_vent":rx_buffer}})                                                              

我收到以下错误。
 File "/usr/lib/python2.7/site-packages/elasticsearch/transport.py", line 307, in perform_request
    status, headers, data = connection.perform_request(method, url, params, body, ignore=ignore, timeout=timeout)
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/http_urllib3.py", line 89, in perform_request
    self._raise_error(response.status, raw_data)
  File "/usr/lib/python2.7/site-packages/elasticsearch/connection/base.py", line 105, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(status_code, error_message, additional_info)
RequestError: TransportError(400, u'ActionRequestValidationException[Validation Failed: 1: script or doc is missing;]')

最佳答案

下面的代码假定您使用的是官方elasticsearch-py库,但是将代码轻松转换到另一个库很容易。

我们可能还需要为您的doc_type类型的组合文档创建一个特定的映射,但这在很大程度上取决于您以后如何查询它。

无论如何,基于上面的讨论,我将首先索引json1

from elasticsearch import Elasticsearch
es_client = Elasticsearch(hosts=[{"host": "localhost", "port": 9200}])

json1 = { ...JSON of the first document you've received... }

// extract the unique ID
// note: you might want to only take 14d9e013794 and ditch "myserver.com-" if that prefix is always constant
doc_id = json1['data']['Message']['TransID']

// index the first document
es_client.index(index="my_index", doc_type="doc_type", id=doc_id, body=json1)

此时,json1已存储在Elasticsearch中。然后,当您稍后获得第二个文档json2时,可以像这样继续:
json2 = { ...JSON of the first document you've received... }

// extract the unique ID
// note: same remark about keeping only the second part of the id
doc_id = json2['TransID']

// make a partial update of your first document
es_client.update(index="my_index", doc_type="doc_type", id=doc_id, body={"doc": {"SecondDoc": json2}})

请注意,SecondDoc可以是您在此处选择的任何名称,它只是一个嵌套字段,将包含您的第二个文档。

此时,您应该具有一个ID为14d9e013794和以下内容的文档:
{
  "msg": "0",
  "tdxy": "1",
  "data": {
    "Metric": "true",
    "Severity": "warn",
    "Message": {
      "Session": "None",
      "TransId": "myserver.com-14d9e013794",
      "TransName": "dashboard.action",
      "Time": 0,
      "Code": 0,
      "CPUs": 8,
      "Lang": "en-GB",
      "Event": "false"
    },
    "EventTimestamp": "1433192761097"
  },
  "Timestamp": "1433732801097",
  "Host": "myserver.myspace.com",
  "Group": "UndefinedGroup",
  "SecondDoc": {
    "Message": "Hello World",
    "Session": "4B5ABE9B135B7EHD49343865C83AD9E079",
    "TransId": "myserver.com-14d9e013794",
    "TransName": "dashboard.action",
    "points": [
      {
        "Name": "service.myserver.com:9065",
        "Host": "myserver.com",
        "Port": "9065"
      }
    ],
    "Points Operations": 1,
    "Points Exceeded": 0,
    "HEADER.connection": "Keep-Alive",
    "PARAMETER._": "1432875392706"
  }
}

当然,您可以在索引/更新它们之前对json1json2进行任何处理。

关于python - 通过 key 链接的两个JSON文档,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30544973/

相关文章:

javascript - Bokeh HoverTool 自定义字体和来自 Python 的换行符

ajax - 返回 JSON 对象时出现 406 错误 – 意外内容

elasticsearch - 使用elasticsearch在两个值之间搜索价格

elasticsearch - 更像这个查询没有被序列化 - NEST

elasticsearch - Elasticsearch 街道范围搜索

java - 为什么 Jython Servlet post 请求中 fieldStorage 为空

python - Jinja2 include & extends 没有按预期工作

python - Spark SQL 读取 40 万行时出现内存不足错误

angularjs - 将 JSON.STRINGIFY 转换为 json 数组长度不计算

java - 如何使用 Java 上的 Selenium WebDriver 从 json 调用(Post、Get、JSON)中获取任何值?