python - 如何限制创建 celery 任务的速度快于它们被消耗的脚本?

标签 python queue task celery

我有一个脚本可以生成数百万个 Celery 任务,数据库中每行一个。有没有办法限制它,以免它完全淹没 celery ?

理想情况下,我想让 Celery 保持忙碌,但我不希望 Celery 队列的长度超过几十个任务,因为那只是浪费内存(特别是因为如果没有某种限制,脚本将增加数百万个任务几乎立即将任务添加到队列中)。

最佳答案

在过去的几天里,我在这个问题上花了一些时间,并提出了我称之为 CeleryThrottle 对象的东西。基本上,您告诉它队列中需要多少个项目,它会尽力将队列保持在该大小和该大小的 2 倍之间。

下面是代码(假设是 Redis 代理,但很容易更改):

# coding=utf-8
from collections import deque

import time

import redis
from django.conf import settings
from django.utils.timezone import now


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return r.llen(queue_name)


class CeleryThrottle(object):
    """A class for throttling celery."""

    def __init__(self, min_items=100, queue_name='celery'):
        """Create a throttle to prevent celery run aways.

        :param min_items: The minimum number of items that should be enqueued. 
        A maximum of 2× this number may be created. This minimum value is not 
        guaranteed and so a number slightly higher than your max concurrency 
        should be used. Note that this number includes all tasks unless you use
        a specific queue for your processing.
        """
        self.min = min_items
        self.max = self.min * 2

        # Variables used to track the queue and wait-rate
        self.last_processed_count = 0
        self.count_to_do = self.max
        self.last_measurement = None
        self.first_run = True

        # Use a fixed-length queue to hold last N rates
        self.rates = deque(maxlen=15)
        self.avg_rate = self._calculate_avg()

        # For inspections
        self.queue_name = queue_name

    def _calculate_avg(self):
        return float(sum(self.rates)) / (len(self.rates) or 1)

    def _add_latest_rate(self):
        """Calculate the rate that the queue is processing items."""
        right_now = now()
        elapsed_seconds = (right_now - self.last_measurement).total_seconds()
        self.rates.append(self.last_processed_count / elapsed_seconds)
        self.last_measurement = right_now
        self.last_processed_count = 0
        self.avg_rate = self._calculate_avg()

    def maybe_wait(self):
        """Stall the calling function or let it proceed, depending on the queue.

        The idea here is to check the length of the queue as infrequently as 
        possible while keeping the number of items in the queue as closely 
        between self.min and self.max as possible.

        We do this by immediately enqueueing self.max items. After that, we 
        monitor the queue to determine how quickly it is processing items. Using 
        that rate we wait an appropriate amount of time or immediately press on.
        """
        self.last_processed_count += 1
        if self.count_to_do > 0:
            # Do not wait. Allow process to continue.
            if self.first_run:
                self.first_run = False
                self.last_measurement = now()
            self.count_to_do -= 1
            return

        self._add_latest_rate()
        task_count = get_queue_length(self.queue_name)
        if task_count > self.min:
            # Estimate how long the surplus will take to complete and wait that
            # long + 5% to ensure we're below self.min on next iteration.
            surplus_task_count = task_count - self.min
            wait_time = (surplus_task_count / self.avg_rate) * 1.05
            time.sleep(wait_time)

            # Assume we're below self.min due to waiting; max out the queue.
            if task_count < self.max:
                self.count_to_do = self.max - self.min
            return

        elif task_count <= self.min:
            # Add more items.
            self.count_to_do = self.max - task_count
            return

用法如下:

throttle = CeleryThrottle()
for item in really_big_list_of_items:
    throttle.maybe_wait()
    my_task.delay(item)

非常简单,希望非常灵活。有了它,代码将监视您的队列并在队列变得太长时将等待添加到您的循环中。这是在our github repo以防有更新。

在执行此操作时,它将跟踪任务的滚动平均速度,并且将尝试不比需要更频繁地检查队列长度。例如,如果每个任务需要两分钟来运行,在将 100 个项目放入队列后,它可能会等待很长时间才能再次检查队列的长度。这个脚本的一个更简单的版本可以在每次循环时检查队列长度,但这会增加不必要的延迟。这个版本试图以有时出错为代价变得聪明(在这种情况下队列低于 min_items)。

关于python - 如何限制创建 celery 任务的速度快于它们被消耗的脚本?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43334727/

相关文章:

c++ - SWIG:将异常从 Python 抛出到 C++

java - [Java] : Which kind of queue to use for my scenario?

clojure - Clojure 中的工作队列

gradle - 使用gradle任务进行版本增量

c# - 在运行其余方法之前更新 UI

python - 模拟 Django 查询集以测试采用查询集的函数

python - 将特定单元格中的值拆分为数据框中的行

python - Pandas :当列相等时获取不同行的平均值

node.js - Bull队列: When a job fails,如何停止队列处理剩余作业?

c# - 只需要 'most recent' 任务 - 取消/忽略的最佳实践?