我计划使用 Celery 来处理由我的主服务器事件触发的推送通知和电子邮件的发送。
这些任务需要打开与外部服务器(GCM、APS、电子邮件服务器等)的连接。它们可以一次处理一个,也可以通过单个连接批量处理以获得更好的性能。
通常会在短时间内分别触发这些任务的多个实例。例如,在一分钟内,可能有几十个推送通知需要发送给具有不同消息的不同用户。
在 Celery 中处理这个问题的最佳方法是什么?似乎天真的方法是简单地为每条消息分配不同的任务,但这需要为每个实例打开一个连接。
我希望有某种任务聚合器允许我处理,例如'所有未完成的推送通知任务'。
有这样的东西吗?有没有更好的方法来解决这个问题,例如附加到事件任务组?
我错过了什么吗?
罗伯特
最佳答案
我最近发现并在我的项目中实现了 celery.contrib.batches
模块。在我看来,这是比 Tommaso 的回答更好的解决方案,因为您不需要额外的存储层。
这是一个例子 straight from the docs :
A click counter that flushes the buffer every 100 messages, or every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.
# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
from collections import Counter
count = Counter(request.kwargs['url'] for request in requests)
for url, count in count.items():
print('>>> Clicks: {0} -> {1}'.format(url, count))
不过要小心,它适合我的使用,但它在文档中提到这是一个“实验任务类”。这可能会阻止某些人使用具有如此易变描述的功能:)
关于python - celery 任务分组/聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12556309/