python - Python 异步函数中的多个 Await

标签 python python-3.x python-asyncio aiohttp

我在自定义类中使用 aiohttp session 和信号量:

async def get_url(self, url):

    async with self.semaphore:
        async with self.session.get(url) as response:
            try:
                text_response = await response.text()
                read_response = await response.read()
                json_response = await response.json()
                await asyncio.sleep(random.uniform(0.1, 0.5))
            except aiohttp.client_exceptions.ContentTypeError:
                json_response = {}

            return {
                'json': json_response,
                'text': text_response,
                'read': read_response,
                'status': response.status,
                'url': response.url,
            }

我有两个问题:

  1. 在单个异步函数中包含多个await 语句是否正确?我需要返回response.text()和response.read()。但是,根据 URL,response.json() 可能可用,也可能不可用,因此我将所有内容放入 try/except block 中以捕获此异常。

  2. 由于我使用此函数循环访问不同 RESTful API 端点的列表,因此我通过信号量控制同时请求的数量(设置为最大值 100),但我还需要交错请求,以便它们不会干扰主机的日志。所以,我想我可以通过添加一个在 0.1-0.5 秒之间随机选择的 asyncio.sleep 来实现这一点。这是在请求之间强制执行短暂等待的最佳方法吗?我应该将其移动到函数的开头而不是接近结尾吗?

最佳答案

  1. 在一个异步函数中有多个等待是绝对可以的,只要你知道你在等待什么,并且每个等待都会被一一等待,就像非常正常的顺序执行一样。关于 aiohttp 需要提到的一件事是,你最好先调用 read() 并捕获 UnicodeDecodeError,因为在内部 text()json() 首先调用 read() 并处理其结果,您不希望处理阻止至少返回 read_response。您不必担心 read() 被多次调用,它只是在第一次调用时缓存在响应实例中。

  2. 随机交错是应对突发交通的简单有效的解决方案。但是,如果您想精确控制任何两个请求之间的最小时间间隔 - 出于学术原因,您可以设置两个信号量:

    def __init__(self):
        # something else
        self.starter = asyncio.Semaphore(0)
        self.ender = asyncio.Semaphore(30)
    

    然后更改 get_url() 以使用它们:

    async def get_url(self, url):
        await self.starter.acquire()
        try:
            async with self.session.get(url) as response:
                # your code
        finally:
            self.ender.release()
    

    因为 starter 初始化为零,所以所有 get_url() 协程将在 starter 上阻塞。我们将使用一个单独的协程来控制它:

    async def controller(self):
        last = 0
        while self.running:
            await self.ender.acquire()
            sleep = 0.5 - (self.loop.time() - last)  # at most 2 requests per second
            if sleep > 0:
                await asyncio.sleep(sleep)
            last = self.loop.time()
            self.starter.release()
    

    你的主程序应该是这样的:

    def run(self):
        for url in [...]:
            self.loop.create_task(self.get_url(url))
        self.loop.create_task(self.controller())
    

    所以一开始 Controller 会在15秒内均匀释放starter 30次,因为那是ender的初始值。之后,如果距上次释放 starter 已经过去了 0.5 秒,则一旦任何 get_url() 结束, Controller 就会释放 starter,否则它会等到那个时间。

    这里有一个问题:如果要获取的 URL 不是内存中的常量列表(例如,不断地来自网络,URL 之间存在不可预测的延迟),RPS 限制器将失败(在实际有要获取的 URL 之前启动程序过早发布) )。尽管流量爆发的可能性已经非常低,但您仍需要针对这种情况进行进一步调整。

关于python - Python 异步函数中的多个 Await,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50243031/

相关文章:

python-3.x - 如何知道哪些特征对预测目标类别影响更大?

python - 列表元素大小写转换

python - neo4j中索引和内部ID有什么区别?

python - 使用 Ruby/Python 实现远程命令行界面

python - 为类运行初始化器

Python 异步流 API

python - 如何仅打印 HTML 代码中的数字并对其求和?(Python)

javascript - 动态文本抓取

python - 尝试在 Atom 中运行 Hydrogen 时如何修复 "NotImplementedError"

python - asyncio.test_utils.run_briefly 到底是做什么的?