python - 根据参数使函数异步

标签 python python-asyncio

我有一个发出 HTTP 请求然后返回响应的函数。我希望这个函数能够根据参数以阻塞或非阻塞模式运行。这在 Python 中可能吗?我想象的伪代码是这样的:

def maybe_async(asynchronous):
    if asynchronous:
        # We assume there's an event loop running and we can await
        return await send_async_http_request()
    else:
        # Perform a normal synchronous call
        return send_http_request()
这会抛出一个 SyntaxError我想找到一种方法来改写它,以便它抛出 RuntimeError: no running event loop在运行时如果 asynchronous=True但没有事件循环正在运行。

有两条评论说我只需要删除 await关键字来自 maybe_async ,但是我相信如果我们想对响应进行后处理,那么这无关紧要。这是一个更具体的用例:假设我想向最终用户提供一个函数,该函数从 GitHub API 收集所有事件 ID,并根据用户输入以阻塞或非阻塞模式执行此操作。这是我想做的事情:
import aiohttp
import requests


async def get_async(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

def get_sync(url):
    return requests.get(url).json()

def get(url, asynchronous):
    if asynchronous:
        return get_async(url)  # This is the suggested edit where
                               # I removed the await keyword
    else:
        return get_sync(url)

def get_github_events_ids(asynchronous=False):
    url = 'https://api.github.com/events'
    body = get(url, asynchronous)
    return [event['id'] for event in body]
但显然正在运行get_github_events_ids(True)引发 TypeError: 'coroutine' object is not iterable .
我的问题是:除了复制所有允许在同步和异步之间进行选择的功能之外,还有其他代码设计吗?

最佳答案

问题是在 asyncio 下,函数类似于 get_github_events_ids本身必须是异步的,或者返回通过调用(但不等待)异步函数获得的对象。这是为了允许他们在等待结果到达时暂停执行。
您可以为每个函数创建两个版本,一个用于等待,另一个用于运行代码,但这会导致大量代码重复。有一个更好的方法,但它需要一点魔法。
首先,在内部代码必须始终使用异步,因为这是在异步情况下传播暂停的唯一方法。但是在同步的情况下,它可以将同步调用返回的对象包装在可以简单等待的东西中(因为协程只会返回同步调用的结果),我们可以在顶层等待。我们将该操作称为“等待”。在异步情况的顶层,我们可以将协程对象返回给等待它的调用者,而在同步情况下,我们可以只驱动协程完成,我们将恰本地称为“驱动”的操作。
这是什么get_github_events_ids看起来像:

def get_github_events_ids(asynchronous=False):
    coro = _get_github_events_ids_impl(asynchronous)
    if asynchronous:
        return coro         # let the caller await
    else:
        return drive(coro)  # get sync result from "coroutine"
实现看起来总是异步的:
async def _get_github_events_ids_impl(asynchronous):
    url = 'https://api.github.com/events'
    body = await awaitify(get(url, asynchronous))
    return [event['id'] for event in body]

# "get", "get_sync", and "get_async" remain exactly as
# in the question
现在我们只需要定义 awaitifydrive魔法功能:
def awaitify(obj):
    if isinstance(obj, types.CoroutineType):
        return obj  # nothing to do, we're already async
    # return an async shim that will just return the object
    async def _identity():
        return obj
    return _identity()

def drive(coro):
    # coro is not really async, so we don't need an event loop or
    # anything, we just drive the coroutine object to completion.
    # Don't try this at home!
    while True:
        try:
            coro.send(None)
        except StopIteration as done:
            return done.value
要测试它,只需以两种方式运行它:
if __name__ == '__main__':
    # test sync
    print(get_github_events_ids(False))
    # test async
    print(asyncio.run(get_github_events_ids(True)))

关于python - 根据参数使函数异步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65345322/

相关文章:

python - 通过后台线程访问 Tkinter 的文本小部件会导致崩溃

python - 如何检查列表是否只包含某个项目

python - 计算并绘制 (Y) 总计列表中每 (X) 项的平均值

python - 使用 Python 更新 Facebook 状态

python - 使异步事件循环响应 Windows(和 Unix)上的 KeyboardInterrupt

python-3.x - 为什么我收到 ValueError : too many file descriptors in select()?

python - aioredis + aiohttp 正确使用连接池

python - 取消异步上下文管理器

python - 如何在 Django Channels 中创建服务器端计时器/倒计时?

python - tuntap 的替代品