python - 如何确保在ES API中捕获所有数据?

标签 python elasticsearch

我正在尝试在Python中创建一个API,以从ES中提取数据并将其提供到数据仓库中。数据是实时的,每秒都会被填充,因此我将创建一个近实时的管道。
当前的URL格式为{{url}}/{{index}}/_search,我发送的测试有效负载为:

{
   "from" : 0,
   "size" : 5
}
在下一次刷新时,它将使用有效负载进行拉取:
{
   "from" : 6,
   "size" : 5
}
依此类推,直到达到记录总数。 PROD环境大约有2.5亿行,我将每个提取的大小设置为10K。
我很担心,因为我不知道是否在ES中对记录进行了重新排序。当前,有一个插件使用用户生成的时间戳,但存在缺陷,因为有时会跳过文档,这是因为json的延迟可用于ES中的提取,并且可能是生成时间的方式。
有谁知道使用/_search提取数据时的默认排序是什么?

最佳答案

我想您正在寻找的是流/更改API,它是described by @Val here很好,也是一个开放的feature request
同时,您不能真正依靠sizefrom参数-您可能会进行冗余查询并在重复项到达数据仓库之前对其进行处理。
另一个选择是在这方面跳过ES并直接流到仓库?我的意思是,将一次ES快照拍摄到给定时间一次(这样就可以保存历史数据),将其馈送到仓库,然后直接从将数据获取到仓库的任何地方进行流式处理。

附录
AFAIK默认排序是按插入日期。但是没有内部_insertTime或类似的代码。您可以使用游标-这称为滚动,而这是一个py implementation。但这是从“最新”文档到“第一”文档的,而不是相反。因此,它将为您提供所有现有文档,但是在滚动时我不确定新添加的文档。然后,您想再次运行次佳的滚动。
您也可以pre-sort your index,当结合滚动使用时,它应该可以很好地用于您的用例。

关于python - 如何确保在ES API中捕获所有数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63341569/

相关文章:

elasticsearch - 带有嵌套对象的Elasticsearch查询

python - 如何使用 scikit-learn 在 Python 中打印简单线性回归的截距和斜率?

python:找到空条目时将列表分成几部分

search - 如何使用 Elasticsearch 中的 Solr 索引

optimization - Elasticsearch查询优化

angular - Firebase列表对象到Angular 2中的可观察数组

elasticsearch - Elasticsearch 动态模板不起作用

python - 在 Google App Engine 上创建大量数据存储对象的最节省内存的方法是什么?

Python 3 和带有 conda 的 mysqlclient

用于获取受 DES/kerberos 保护的 URL 的 Python 脚本