背景
我需要向大约 100 万台设备发送大量通知,我正在使用 Google Cloud Functions 构建它。
在当前设置中,我将每个设备 token 作为 PubSub 消息排队:
- 在 DataStore 中存储待处理通知,用于跟踪重试和成功状态
- 尝试发送通知
- 如果重试次数足够多但仍未完成,则将通知标记为成功或失败
此过程由人工上传包含所有 token 的 CSV 手动启动。原则上,内置重试应该足够了,但我想确保,如果云功能本身或 APNs/FCM 出现问题,我可以以与上传相同的格式返回所有失败 token 的 CSV,以便当用户认为这是一个好主意时,他们只能重试失败的一次。
我将通知作为作业的一部分运行,用于与通知状态一起查询。为此,我在 job_id
和 status
上设置了复合索引,并对所有匹配的通知运行查询,并希望将其作为文件流式传输到用户或将其存储在 Google Cloud Storage 中,以便用户可以从那里下载。
问题
假设接近通知总数的失败并且我希望将所有 token 获取到一个文件中,我的第一个实现只是迭代所有匹配条目并构建结果。问题是,以这种方式检索每 100_000 个条目大约需要 1 分钟。对于接近所有通知的内容,这将使我超出云功能的最大超时时间。每个实体总共大约 300 字节,这使得整个导出大约 300MB。我可能可以通过添加一个更大的索引来将其大小减少到大约一半/三分之二,这样我就可以只为我想要的字段进行投影。
我能想到的唯一替代方案是将通知分片,将整个组分成 100 个分片,创建 100 个文件,每个文件包含 10k 通知,然后下载所有文件,并在用户尝试下载文件时将它们缝合在一起。
我发布这个问题的原因是,这感觉像是一个相对简单的问题,而这个解决方案感觉比我预期的要复杂一些,所以我想我可能会遗漏一些东西。
问题
- 我是否缺少一种明显、更简单的方法来实现我想要的目标?
- 分片是否正是处理此类问题的预期方式,而我应该接受其复杂性?
代码
为了清楚起见,这是我正在运行的代码片段,我只是迭代它返回的响应以生成输出。
def get_failures(job_id):
query = client.query(kind = Notification.kind)
query.add_filter('job_id', '=', str(job_id))
query.add_filter('status', '=', "failure")
return query.fetch()
最佳答案
此问题的可靠解决方案是使用 Google Dataflow。我目前使用它来完成此任务,在 Google Cloud Storage 中生成 csv 文件,其中包含与给定数据存储查询匹配的所有约 500k 记录。
不过,设置它可能会有点复杂。
在开始之前,我使用了 Google 任务队列,它有 10 分钟超时,而不是 30 秒超时。我不确定您是否可以纯粹在云函数中执行此操作,或者您是否需要创建一个简单的应用程序引擎项目来充当这些任务的请求处理程序
关于python - 谷歌云数据存储: Exporting entities that match a certain query,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54220605/