我目前有一个在 appengine 上运行的应用程序,并且我正在使用延迟库执行一些作业,其中一些任务每天运行,而其中一些任务每月执行一次。大多数这些任务查询数据存储以检索文档,然后将实体存储在索引中(搜索 API)。其中一些表每月更换一次,我必须在所有实体(4~5M)上运行这些任务。
此类任务的一个示例是:
def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
#get index
BATCH_SIZE = 200
cps, next_cursor, more = Company.query().\
fetch_page(BATCH_SIZE,
start_cursor=cursor)
doc_list = []
for i in range(0, len(cps)):
cp = cps[i]
#create a Index Document using the Datastore entity
#this document has only about 5 text fields and one date field
cp_doc = getCompanyDocument(cp)
doc_list.append(cp_doc)
index = search.Index(name='Company')
index.put(doc_list)
n_entities += len(doc_list)
if more:
logging.debug('Company: %d added to index', n_entities)
#to_put[:] = []
doc_list[:] = []
deferred.defer(addCompaniesToIndex,
cursor=next_cursor,
n_entities=n_entities,
mindate=mindate)
else:
logging.debug('Finished Company index creation (%d processed)', n_entities)
当我仅运行一项任务时,每个延迟任务的执行大约需要 4-5 秒,因此索引我的 500 万个实体大约需要 35 小时。
另一件事是,当我在同一队列上使用不同的延迟任务对另一个索引(例如,每日更新之一)运行更新时,两者的执行速度都要慢得多。每次延迟调用大约需要 10-15 秒,这是难以忍受的。
我的问题是:有没有一种方法可以更快地完成此操作并将推送队列扩展到每次运行多个作业?或者我应该使用不同的方法来解决这个问题?
提前致谢,
最佳答案
通过将 if more
语句放在 addCompaniesToIndex()
函数的末尾,您实际上是在序列化任务执行:直到当前的延迟任务已完成对其文档份额的索引。
您可以做的是将 if more
语句移到 Company.query().fetch_page()
调用之后,您可以在其中获取(大部分)所需的变量用于下一个延迟任务的执行。
这样,下一个延迟任务将在当前任务完成之前创建并排队(长时间),因此它们的处理可能会重叠/交错。您还需要一些其他修改,例如处理 n_entities
变量,该变量在更新的场景中失去其当前含义 - 但这或多或少是装饰/信息性的,对于实际的文档索引操作来说并不是必需的。
如果延迟任务的数量非常多,则存在同时对太多任务进行排队的风险,这可能会导致 GAE 为处理这些任务而产生的实例数量“爆炸”。在这种情况下,您可以通过稍微延迟执行来“限制”延迟任务的生成速度,请参阅 https://stackoverflow.com/a/38958475/4495081 .
关于python - 谷歌应用引擎 : Task queue performance,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40939072/