python - Telegram 机器人的 API 速率限制

标签 python api flask gunicorn telegram-bot

我正在开发一个带有模块 pyTelegramBotAPI 的机器人,它通过 webhooks 工作,并安装 Flask+Gunicorn 作为 webhooks 的服务器。 Gunicorn 正在与 5 个工作人员合作以提高速度,我的项目结构如下所示:

app.py
bot.py

在 bot.py 中,我有一个处理更新的函数:

def process_the_update(update):
    logger.info(update)
    update = types.Update.de_json(update)
    bot.process_new_updates([update])

在app.py中我导入了这个函数,因此,每当更新到来时,app.py都会调用这个函数,并且bot将处理更新。在我的机器人中,用户可以调用命令,该命令将使用外部 api 来获取一些信息。问题是,这个外部 api 每秒有 3 个请求的限制。我需要配置一个具有这样的速率限制的机器人。首先我想到用队列来完成它,代码如下:

lock_queue = Queue(1)
requests_queue = Queue(3)
def api_request(argument):
    if lock_queue.empty():
        try:
            requests_queue.put_nowait(time.time())
        except queue.Full:
            lock_queue.put(1)
            first_request_time = requests_queue.get()
            logger.info('First request time: ' + str(first_request_time))
            current_time = time.time()
            passed_time = current_time - first_request_time
            if passed_time >= 1:
                requests_queue.put_nowait(time.time())
                lock_queue.get()
            else:
                logger.info(passed_time)
                time.sleep(1 - passed_time)
                requests_queue.put_nowait(time.time())
                lock_queue.get()
    else:
        lock_queue.put(1)
        first_request_time = vk_requests_queue.get()
        logger.info('First request time: ' + str(first_request_time))
        current_time = time.time()
        passed_time = current_time - first_request_time
        if passed_time >= 1:
            requests_queue.put_nowait(time.time())
            lock_queue.get()
        else:
            logger.info(passed_time)
            time.sleep(1 - passed_time)
            requests_queue.put_nowait(time.time())
            lock_queue.get()
    result = make_api_request(argument) # requests are made too by external module.
    return result 

逻辑是,正如我所想,因为模块 pyTelegramBotAPI 使用线程来更快地处理更新,所以所有线程都会检查 requests_queue,该队列的时间为最后 3 个 api_requests,因此发出的 3 个请求中的第一个请求的时间将是与当前时间相比(检查是否经过了一秒)。而且,因为我需要确定只有一个线程会同时进行这种比较,所以我创建了 lock_queue。 但是,问题是,首先,gunicorn 使用 5 个工作线程,因此总是有可能来自用户的所有消息将在不同的进程中处理,并且这些进程将有自己的队列。其次,即使我将工作人员数量设置为默认值(1 个工作人员),我仍然会收到 429 错误,所以我认为我的代码根本无法按我想要的方式工作。

我想用redis进行速率限制,所以每次在每个线程和进程机器人中都会检查最后3个请求的时间,但我仍然不确定这是正确的方法,而且我不确定,这个怎么写。

如果有人提出任何想法甚至代码示例(外部 api 不提供任何 x-rate-limit header ),我会很高兴

最佳答案

写了这个函数,使用redis来统计请求数(基于这个https://www.binpress.com/tutorial/introduction-to-rate-limiting-with-redis/155教程)

import redis

r_db = redis.Redis(port=port, db=db)

def limit_request(request_to_make, limit=3, per=1, request_name='test', **kwargs):
    over_limit_lua_ = '''
    local key_name = KEYS[1]
    local limit = tonumber(ARGV[1])
    local duration = ARGV[2]

    local key = key_name .. '_num_of_requests'
    local count = redis.call('INCR', key)
    if tonumber(count) > limit then
        local time_left = redis.call('PTTL', key)
        return time_left
    end
    redis.call('EXPIRE', key, duration)
    return -2
    '''

    if not hasattr(r_db, 'over_limit_lua'):
        r_db.over_limit_lua = r_db.register_script(over_limit_lua_)

    request_possibility = int(r_db.over_limit_lua(keys=request_name, args=[limit, per]))
    if request_possibility > 0:
        time.sleep(request_possibility / 1000.0)
        return limit_request(request_to_make, limit, per, request_name, **kwargs)
    else:
        request_result = request_to_make(**kwargs)
        return request_result

关于python - Telegram 机器人的 API 速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44742416/

相关文章:

python - Python 列表中的非零元素列表

python - 如何在 Odoo 12 中使用 Python XML-RPC 注册支付

javascript - XMLHTTPRequest 错误响应 header

java - 为大量外部 API 请求扩展软件/硬件?

python - 如何渲染和返回图以在 flask 中查看?

路由中的 Python3 Flask asyncio 子进程挂起

jquery - 即使使用 POST 方法也发送 GET 请求 - jQuery

python - 在 Pygame 中删除图像周围的边框

python - 访问 Flask-socketio session 时出现问题

python - Django 中同一行的多个字段