python - 大型数据集上的ElasticSearch聚合

标签 python pandas elasticsearch aggregation

我使用Python ElasticSearch API。
我的数据集太大,无法使用search()进行检索。
我可以使用helpers.scan()检索它,但是数据太大,无法用 Pandas 快速处理。
因此,我学习了如何使用ElasticSearch进行聚合以压缩数据,但是仍然使用search()我无法检索所有数据。我知道即使“汇总”只给出一行,汇总还是按照“常规”的搜索大小完成的?
最终,我尝试了聚合+扫描或滚动,但是我知道scan()或scroll()不能用于进行聚合,因为这些请求在数据集的子集上起作用,然后在子集上进行废话。
在非常大的数据集上进行聚合的好方法是什么?
我在网络上找不到任何相关的解决方案。
更明确地说,我的情况是:
我有X个成千上万的移动传感器,每小时传送一次最新的停止位置,即新的停止位置。从最后一站到新站的转移可能需要几天的时间,因此在几天里我没有每小时采购量的相关信息。
作为ElasticSearch搜索输出,我只需要格式的每一行:
sensor_id /最后停止/新停止

最佳答案

如果您将 flex 带与 Pandas 一起使用,则可以尝试eland一个新的官方 flex 带库,旨在更好地集成它们。尝试:

es = Elasticsearch() 

body = {
  "size": 0,
  "aggs": {
    "getAllSensorId": {
      "terms": {
        "field": "sensor_id",
        "size": 10000
      },
      "aggs": {
        "getAllTheLastStop": {
          "terms": {
            "field": "last_stop",
            "size": 10000
          },
      "aggs": {
        "getAllTheNewStop": {
          "terms": {
            "field": "new_stop",
            "size": 10000
          }
        }
      }
        }
      }
    }
  }
}
list_of_results = []
result = es.search(index="my_index", body=body)
for sensor in result["aggregations"]["getAllTheSensorId"]["buckets"]:
    for last in sensor["getAllTheLastStop"]["buckets"]:
        for new in last["getAllTheNewStop"]["buckets"]:
            record = {"sensor": sensor['key'], "last_stop": last['key'], "new_stop": new['key']}
            list_of_results.append(record)
            

关于python - 大型数据集上的ElasticSearch聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63305841/

相关文章:

python - 如何在 Python 中读取大文本文件?

python - 将字符串设置为 pandas DataFrame 的索引

elasticsearch - 为 `+` RFC7159 配置 ElasticSearch?

python - 一次用多个等号声明两个变量是pythonic吗?

python - 如何在 PyCharm 中自动生成文档字符串中的字段类型?

python - 删除评论标签但不满足于 BeautifulSoup

python - 获取 celery AsyncResult 的任务名称

Python Pandas 基于多个值字段合并两个数据框

symfony - Fos elastica过滤器

java - 在 Elasticsearch 中添加多个求和聚合