django - Celery 限速 : Is it possible to rate-limit a celery task differently based on a run-time parameter?

标签 django redis queue celery message-queue

我想根据运行时确定的某些参数对 Celery 任务进行速率限制。例如:如果参数为 1,则速率限制可能为 100。如果参数为 2,则速率限制可能为 25。此外,我希望能够在运行时修改这些速率限制。

celery 是否提供了这样做的方法?我可以使用 routing_key 根据参数将任务发送到不同的队列,但 celery 似乎不支持队列级速率限制。

一个可能的解决方案是在排队任务时使用 eta,但我想知道是否有更好的方法来实现这一点。

最佳答案

Celery 提供了一个内置的速率限制系统,但它的工作方式与大多数人期望的速率限制系统不同,并且它有几个限制。我在 Redis 上实现了一个基于 ETA 和一些 Lua 脚本的分布式限速系统,它运行良好,所以我推荐这种方法。

这篇文章详细介绍了一种与那篇文章类似的方法:

https://callhub.io/distributed-rate-limiting-with-redis-and-celery/

我用了一个更简单的版本,我的lua脚本是这样的:

local current_time = tonumber(ARGV[1])
local eta = tonumber(redis.call('get', KEYS[1]))
local interval = tonumber(ARGV[2])

if not eta or eta < current_time then
    redis.call('set', KEYS[1], current_time + interval, 'EX', 10800)
    return nil
else
    redis.call('set', KEYS[1], eta + interval, 'EX', 10800)
    return tostring(eta)
end

我不得不简单地覆盖任务 apply_async 方法并以我想要的延迟调用该 lua 脚本:

def apply_async(self, *args, **kwargs):
    now = int(time.time())

    # From django-redis
    conn = get_redis_connection('default')

    cache_key = 'something'

    eta = conn.eval(self.rate_limit_script, 1, cache_key, now, rate_limiter.get_delay())

    if eta:
        eta = datetime.fromtimestamp(float(eta), tz=timezone.get_current_timezone())
        kwargs['eta'] = eta
    return super().apply_async(*args, **kwargs)

关于django - Celery 限速 : Is it possible to rate-limit a celery task differently based on a run-time parameter?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56027495/

相关文章:

Laravel 5.1 不处理排队作业

javascript - settimeout 并等待(如果已设置超时)

python - Google App Engine + 表单验证

python - Django 中的文件上传表单

mysql - Django ORM : Tried to do inner join with foreign key but causes FieldError

php - 避免重复预订/重复消费

redis - 如何解码 Redis 数据库中的值?

lua - 如何减少与 Redis 客户端的握手次数?

azure - BrokeredMessage Body 与 message.Properties 之间的区别?

Django manage.py 产生几个 fcgi 进程