python - celery 任务分组/聚合

标签 python asynchronous task celery aggregation

我计划使用 Celery 来处理由我的主服务器事件触发的推送通知和电子邮件的发送。

这些任务需要打开与外部服务器(GCM、APS、电子邮件服务器等)的连接。它们可以一次处理一个,也可以通过单个连接批量处理以获得更好的性能。

通常会在短时间内分别触发这些任务的多个实例。例如,在一分钟内,可能有几十个推送通知需要发送给具有不同消息的不同用户。

在 Celery 中处理这个问题的最佳方法是什么?似乎天真的方法是简单地为每条消息分配不同的任务,但这需要为每个实例打开一个连接。

我希望有某种任务聚合器允许我处理,例如'所有未完成的推送通知任务'。

有这样的东西吗?有没有更好的方法来解决这个问题,例如附加到事件任务组?

我错过了什么吗?

罗伯特

最佳答案

我最近发现并在我的项目中实现了 celery.contrib.batches 模块。在我看来,这是比 Tommaso 的回答更好的解决方案,因为您不需要额外的存储层。

这是一个例子 straight from the docs :

A click counter that flushes the buffer every 100 messages, or every 10 seconds. Does not do anything with the data, but can easily be modified to store it in a database.

# Flush after 100 messages, or 10 seconds.
@app.task(base=Batches, flush_every=100, flush_interval=10)
def count_click(requests):
    from collections import Counter
    count = Counter(request.kwargs['url'] for request in requests)
    for url, count in count.items():
        print('>>> Clicks: {0} -> {1}'.format(url, count))

不过要小心,它适合我的使用,但它在文档中提到这是一个“实验任务类”。这可能会阻止某些人使用具有如此易变描述的功能:)

关于python - celery 任务分组/聚合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/12556309/

相关文章:

python - 我怎么知道 Pandas Data Frame 中哪些是重复的行?

Python - 通过模块属性(按字符串名称)访问类实例

Python:平均分配给定数组中的数字

c# - 为什么异步代码比同步代码慢这么多

c# - Monitor.Enter 不会阻塞其他任务

c# - 等待方法什么时候退出程序?

python - 生成随机颜色 (RGB)

javascript - 如何计算 JavaScript 中异步函数的执行时间?

javascript - $(document).ready() 是在主线程中执行,还是异步?

c# 在执行之前构建任务列表