我想根据运行时确定的某些参数对 Celery 任务进行速率限制。例如:如果参数为 1,则速率限制可能为 100。如果参数为 2,则速率限制可能为 25。此外,我希望能够在运行时修改这些速率限制。
celery 是否提供了这样做的方法?我可以使用 routing_key 根据参数将任务发送到不同的队列,但 celery 似乎不支持队列级速率限制。
一个可能的解决方案是在排队任务时使用 eta
,但我想知道是否有更好的方法来实现这一点。
最佳答案
Celery 提供了一个内置的速率限制系统,但它的工作方式与大多数人期望的速率限制系统不同,并且它有几个限制。我在 Redis 上实现了一个基于 ETA 和一些 Lua 脚本的分布式限速系统,它运行良好,所以我推荐这种方法。
这篇文章详细介绍了一种与那篇文章类似的方法:
https://callhub.io/distributed-rate-limiting-with-redis-and-celery/
我用了一个更简单的版本,我的lua脚本是这样的:
local current_time = tonumber(ARGV[1])
local eta = tonumber(redis.call('get', KEYS[1]))
local interval = tonumber(ARGV[2])
if not eta or eta < current_time then
redis.call('set', KEYS[1], current_time + interval, 'EX', 10800)
return nil
else
redis.call('set', KEYS[1], eta + interval, 'EX', 10800)
return tostring(eta)
end
我不得不简单地覆盖任务 apply_async
方法并以我想要的延迟调用该 lua 脚本:
def apply_async(self, *args, **kwargs):
now = int(time.time())
# From django-redis
conn = get_redis_connection('default')
cache_key = 'something'
eta = conn.eval(self.rate_limit_script, 1, cache_key, now, rate_limiter.get_delay())
if eta:
eta = datetime.fromtimestamp(float(eta), tz=timezone.get_current_timezone())
kwargs['eta'] = eta
return super().apply_async(*args, **kwargs)
关于django - Celery 限速 : Is it possible to rate-limit a celery task differently based on a run-time parameter?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56027495/