python - 协程外的Aiohttp ClientSession

标签 python multithreading python-asyncio aiohttp

我有一个 REST API 包装器,它应该在交互式 Python session 中运行。 HTTP 请求既可以通过自动后台线程(使用 API 包装器)发出,也可以由最终用户通过交互式 session 手动发出。我正在尝试将所有 HTTP 请求管理从以前的 new-thread-per-request 方法迁移到 asyncio,但由于我无法在主线程中运行 asyncio 循环(它必须免费用于临时 Python 命令/请求),我编写了以下代码以在后台线程中运行它:

import aiohttp
import asyncio
from concurrent.futures import ThreadPoolExecutor

def start_thread_loop(pool=None):
    """Starts thread with running loop, bounding the loop to the thread"""
    def init_loop(loop):
        asyncio.set_event_loop(loop)  # bound loop to thread
        loop.run_forever()
    _pool = ThreadPoolExecutor() if pool is None else pool
    loop = asyncio.new_event_loop()
    future = _pool.submit(init_loop, loop)
    return future, loop

def send_to_loop(coro, loop):
    """Wraps couroutine in Task object and sends it to given loop"""
    return asyncio.run_coroutine_threadsafe(coro, loop=loop)

实际的 API 包装器如下所示:

class Foo:
    def __init__(self):
        _, self.loop = start_thread_loop()
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.loop.set_debug(True)

    def send_url(self, url):
        async def _request(url):
            print('sending request')
            async with self.session.get(url) as resp:
                print(resp.status)
        return send_to_loop(_request(url), self.loop)

但是,aiohttp 强烈建议不要创建 ClientSession 在协程之外并在初始化 ClientSession 之前打开 asyncio Debug模式会引发 RuntimeError。因此,我尝试使用 asycio.Queue 制作一个略有不同的版本,以避免在协程中制作 ClientSession:

class Bar:

    def __init__(self):
        _, self.loop = start_thread_loop()
        self.q = asyncio.Queue(loop=self.loop)
        self.status = send_to_loop(self.main(), loop=self.loop)

    async def main(self):
        async with aiohttp.ClientSession(loop=self.loop) as session:
            while True:
                url = await self.q.get()
                print('sending request')
                asyncio.ensure_future(self._process_url(url, session), loop=self.loop)

    def send_url(self, url):
        send_to_loop(self.q.put(url), loop=self.loop)

    @staticmethod
    async def _process_url(url, session):
        async with session.get(url) as resp:
            print(resp.status)

但是,这种方法更复杂/冗长,我真的不明白是否真的有必要。

问题:

  1. 为什么在协程之外启动 ClientSession 会出现问题?
  2. 队列方法更好/更安全吗?如果是,为什么?
  3. 我在后台线程中启动循环的方法有什么问题吗?

最佳答案

Why is it a problem starting a ClientSession outside of a coroutine?

这就是 aiohttp 的构建方式,理论上应该可以在循环之外初始化某种客户端 session ,即。在协程之外,但这不是 aiohttp 的构建方式。 AFAIU the issue that introduced this warning , 这是因为 a) 很难测试 b) 容易出错

Is the queue approach better/safer? If so, why?

我不明白你想达到什么目的,所以我不确定如何回答。也许您遇到的问题是您尝试在构造函数 aka 中初始化 ClientSession。另一个类的 __init__。在这种情况下,您应该通过创建一个辅助方法来解决该问题,该辅助方法是一个将完成类初始化的协程。这是使用异步代码时的已知模式。

Is there any problem in my approach for starting a loop inside a background thread?

完全没问题。

关于python - 协程外的Aiohttp ClientSession,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43108418/

相关文章:

python - 如何使用 Python-Sphinx 生成 HTML 文档?

python - Flask 中的基本 HTTP 身份验证 AttributeError : 'NoneType' error

python - 在 Linux 上用 python 开发,在 Windows 上测试

java - 如何正确通知此适配器其数据集已更改?

python - 是否可以限制异步中同时运行的协程数量?

python - 在进程之间共享字典

java - Spring Batch 中的多线程步骤和本地分区有什么区别?

multithreading - 是否可以按随机顺序执行 celery 任务?

Python:依次等待协程列表

python-asyncio - 运行时错误: Event loop is closed after main coroutine is completed