python - 如何在GAE上处理大文件?

标签 python algorithm google-app-engine google-cloud-datastore app-engine-ndb

我正在寻找一种强大和快速的方式来处理谷歌应用程序引擎中的大文件。
它的工作原理如下(最后简化了工作流):
客户发送一个csv文件,我们的服务器将逐行处理。
文件上传后,将在ndb数据存储中添加一个条目,其中包含csv名称、文件路径(到google存储)和一些基本信息。然后,创建一个称为“预处理”的任务。
预处理任务将在csv文件的所有行(可能是数百万行)上循环,并将每个行的ndb条目添加到Uploads模型中,其中包含csv id、要提取/处理的行、要提取/处理的数据以及该行是否已开始处理并结束处理(“正在处理”、“已完成”)的某些指标(布尔值)。
预处理任务结束后,它会将信息更新到客户端“将处理XXX行”
呼叫UploadEntriesUploads.next()方法将:
搜索nextUploadEntries为false的is_treating
将在redis数据存储中为找到的下一行添加任务。(使用Redis数据存储是因为这里的工作是在非Google管理的服务器上进行的)
还将在任务is_done中创建一个新条目(该任务在5分钟后运行,并检查7)是否已正确执行如果没有,则认为Redis/外部服务器已失败,并执行与7相同的操作,但没有结果(“错误”)。
然后,它将该条目的Process-healthcheck更新为True。
外部服务器将处理数据,并通过向服务器上的端点发出POST请求来返回结果。
该端点更新数据存储中的UploadEntries.is_treating条目(包括“UploadEntries”和“is_treating”),并调用is_done开始下一行。
在uploads.next中,搜索下一个条目时不返回任何内容,我认为该文件将被最终处理,并调用任务Uploads.next(),该任务将用处理过的数据重建csv,并将其返回给客户。
有几件事要记住:
真正起作用的服务器不在google appengine中,这就是为什么我不得不提出redis。
当前的处理方式使我可以灵活处理并行条目的数量:在5)中,post-process方法包含一个Uploads.next()参数,让我并行搜索limit进程。可以是1,5,20,50。
我不能直接将n任务中的所有行添加到Redis,因为在这种情况下,下一个客户将不得不等待第一个文件完成处理,而这将占用太长时间
但这个系统有各种各样的问题,这就是我求助于你的原因:
有时,这个系统速度太快,以至于数据存储尚未正确更新,调用pre-processing时,返回的条目已在处理中(只是Uploads.next()尚未推送到数据库)
Redis或我的服务器(我不知道)有时会丢失任务,或者处理后没有发出POST请求,所以任务永远不会转到entry.is_treating = True这就是为什么我必须实现一个healcheck系统,以确保无论发生什么,都能正确地处理该行。这有一个双重优势:任务的名称包含csv ID和行使每个文件都是唯一的。如果数据存储不是最新的并且相同的任务被运行两次,那么HealthChIPT的创建将失败,因为相同的名称已经存在,让我知道有一个并发问题,所以我忽略了该任务,因为这意味着数据存储还没有更新。
我最初考虑在一个独立的进程中逐行运行文件,但这有一个很大的缺点,就是不能并行运行多行此外,对于专用目标,google将任务的运行时间限制为24小时(不是默认值),当文件非常大时,它可以运行超过24小时。
作为参考,如果有帮助的话,我将使用Python
为了简化工作流程,以下是我试图以最好的方式实现的目标:
处理一个大文件,运行多个并行进程,每行一个。
使用redis将工作发送到外部服务器。完成后,外部服务器通过POST请求将结果返回给主服务器
然后主服务器更新关于该行的信息,并转到下一行
如果有人能有更好的方法来做这件事,我会非常感激的我真的相信我不是第一个做这种工作的人,我很肯定我做得不对。
(我相信Stackoverflow是Stack Exchange中发布此类问题的最佳部分,因为它是一个算法问题,但也有可能我没有看到一个更好的网络如果是的话,我很抱歉)。

最佳答案

真正工作的服务器不在google appengine中
你考虑过用Google Cloud Dataflow来处理大文件吗?
它是一个托管服务,将为您处理文件拆分和处理。
基于最初的想法,这里有一个概要过程:
用户使用signed urls或blobstore API将文件直接上传到google云存储
appengine的请求启动一个小型计算引擎实例,该实例启动一个阻塞请求(BlockingDataflowPipelineRunner)以启动数据流任务。(由于沙箱和阻塞I/O问题,恐怕它需要是一个计算实例)。
数据流任务完成后,计算引擎实例将被解除阻止,并将消息发送到pubsub。
pubsub消息调用appengine服务上的webhook,该webhook将任务状态从“进行中”更改为“完成”,以便用户可以获取其结果。

关于python - 如何在GAE上处理大文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41014995/

相关文章:

algorithm - 如何用 Lua 进行代数运算?

algorithm - "Trim"整数类型的右零

google-app-engine - 我如何配置我的存储库和 TravisCI 以自动部署到 GAE 标准环境?

javascript - 将数字数组拆分为总和小于或等于给定数字的数组

python - 如何将 CNN 的输出剪辑为特定的颜色列表

python - django value_list 给出 id 而不是值

python - 删除 numpy 重新数组中的一行

使用 Google App Engine 的 Facebook 聊天机器人

google-app-engine - 在 Google App Engine 数据模型中进行 'OR' 查询

python - 如何在另一台计算机上运行 Python 项目而不在其上安装任何东西?