我有几个阻塞函数 foo
、bar
并且我无法更改它们(一些我无法控制的内部库。与一个或多个网络服务通信) .我如何将它用作异步?例如。我不想做以下事情。
results = []
for inp in inps:
val = foo(inp)
result = bar(val)
results.append(result)
这将是低效的,因为我可以为第二个输入调用 foo
,而我正在等待第一个输入,对于 bar
也是如此。我如何包装它们以便它们可以与 asyncio 一起使用(即新的 async
、await
语法)?
让我们假设函数是可重入的。即,当先前的 foo
已经在处理时,再次调用 foo
就可以了。
更新
使用可重复使用的装饰器扩展答案。点击here例如。
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
return inner
最佳答案
这里有(有点)两个问题:
- 如何在协程中异步运行阻塞代码
- 如何在“同一”时间运行多个异步任务(顺便说一句:asyncio 是单线程的,所以它是 concurrent, but not truly parallel)。
可以使用高级 asyncio.create_task
创建并发任务或低级 asyncio.ensure_future
.从 3.11 开始,它们也可以通过 asyncio task groups 创建,由 Trio 库开创(Trio 的创建者有一篇关于该主题的优秀博客文章 here)。
要运行同步代码,您需要 run the blocking code in an executor .示例:
import concurrent.futures
import asyncio
import time
def blocking(delay):
time.sleep(delay)
print('Completed.')
async def non_blocking(executor):
loop = asyncio.get_running_loop()
# Run three of the blocking tasks concurrently. asyncio.wait will
# automatically wrap these in Tasks. If you want explicit access
# to the tasks themselves, use asyncio.ensure_future, or add a
# "done, pending = asyncio.wait..." assignment
await asyncio.wait(
fs={
# Returns after delay=12 seconds
loop.run_in_executor(executor, blocking, 12),
# Returns after delay=14 seconds
loop.run_in_executor(executor, blocking, 14),
# Returns after delay=16 seconds
loop.run_in_executor(executor, blocking, 16)
},
return_when=asyncio.ALL_COMPLETED
)
executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))
如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表)安排任务理解等),用 asyncio.wait 等待它们,然后然后检索结果。示例:
done, pending = await asyncio.wait(
fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
return_when=asyncio.ALL_COMPLETED
)
# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]
关于python - 如何将 asyncio 与现有的阻塞库一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41063331/