python - 如何将 asyncio 与现有的阻塞库一起使用?

标签 python python-3.x async-await python-3.5 python-asyncio

我有几个阻塞函数 foobar 并且我无法更改它们(一些我无法控制的内部库。与一个或多个网络服务通信) .我如何将它用作异步?例如。我不想做以下事情。

results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

这将是低效的,因为我可以为第二个输入调用 foo,而我正在等待第一个输入,对于 bar 也是如此。我如何包装它们以便它们可以与 asyncio 一起使用(即新的 asyncawait 语法)?

让我们假设函数是可重入的。即,当先前的 foo 已经在处理时,再次调用 foo 就可以了。


更新

使用可重复使用的装饰器扩展答案。点击here例如。

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner

最佳答案

这里有(有点)两个问题:

  1. 如何在协程中异步运行阻塞代码
  2. 如何在“同一”时间运行多个异步任务(顺便说一句:asyncio 是单线程的,所以它是 concurrent, but not truly parallel)。

可以使用高级 asyncio.create_task 创建并发任务或低级 asyncio.ensure_future .从 3.11 开始,它们也可以通过 asyncio task groups 创建,由 Trio 库开创(Trio 的创建者有一篇关于该主题的优秀博客文章 here)。

要运行同步代码,您需要 run the blocking code in an executor .示例:

import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')


async def non_blocking(executor):
    loop = asyncio.get_running_loop()
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),
            
            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),
            
            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))

如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表)安排任务理解等),用 asyncio.wait 等待它们,然后然后检索结果。示例:

done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

关于python - 如何将 asyncio 与现有的阻塞库一起使用?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41063331/

相关文章:

python - Dict of Dict 到 CSV(带有已定义的 header )

c# - 有没有一种优雅的方法来执行与嵌套 Task.WhenAlls 等效的操作?

javascript - 在 Javascript 异步函数之外访问 JSON 数组元素

c# - 在使用 Task.Run() 创建的任务上调用 await

python - 如何简化将具有某些值的列添加到我的数据框中的过程?

python - 2 类分类中的回归误差

python - 在python中添加动态表名

python - 将数据帧写入 pandas 中的 csv 时,选项卡未显示

python - sklearn 的 DecisionTreeClassifier 中的 "splitter"属性有什么作用?

python - 如何对 pandas 数据框进行非规范化