python - 如何使用 python 的 asyncio 模块正确创建和运行并发任务?

标签 python concurrency task python-3.4 python-asyncio

我正在尝试正确理解和实现两个同时运行的 Task使用 Python 3 相对较新的 asyncio 的对象模块。

简而言之,asyncio 似乎旨在通过事件循环处理异步进程和并发 Task 执行。它提倡使用 await(应用于异步函数)作为等待和使用结果的无回调方式,而不会阻塞事件循环。 ( future 和回调仍然是一个可行的选择。)

它还提供了 asyncio.Task() 类,这是 Future 的专门子类,旨在包装协程。最好使用 asyncio.ensure_future() 方法调用。 asyncio 任务的预期用途是允许独立运行的任务与同一事件循环中的其他任务“同时”运行。我的理解是 Tasks 连接到事件循环,然后自动继续驱动 await 语句之间的协程。

我喜欢能够使用并发任务而无需使用 Executor 之一的想法。类,但我没有找到关于实现的详细说明。

这就是我目前的做法:

import asyncio

print('running async test')

async def say_boo():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...boo {0}'.format(i))
        i += 1

async def say_baa():
    i = 0
    while True:
        await asyncio.sleep(0)
        print('...baa {0}'.format(i))
        i += 1

# wrap in Task object
# -> automatically attaches to event loop and executes
boo = asyncio.ensure_future(say_boo())
baa = asyncio.ensure_future(say_baa())

loop = asyncio.get_event_loop()
loop.run_forever()

在尝试同时运行两个循环任务的情况下,我注意到除非任务具有内部 await 表达式,否则它将卡在 while循环,有效地阻止其他任务运行(很像普通的 while 循环)。但是,一旦任务必须(a)等待,它们似乎可以同时运行而没有问题。

因此,await 语句似乎为事件循环提供了一个在任务之间来回切换的立足点,从而产生并发的效果。

带有内部await的示例输出:

running async test
...boo 0
...baa 0
...boo 1
...baa 1
...boo 2
...baa 2

示例输出没有内部await:

...boo 0
...boo 1
...boo 2
...boo 3
...boo 4

问题

此实现是否通过了 asyncio 中并发循环任务的“正确”示例?

Task 提供阻塞点(await 表达式)以便事件循环处理多个任务的唯一方法是否正确?

编辑

2022 更新:请注意,自提出此问题以来,asyncio API 发生了相当大的变化。请参阅新标记为正确答案,它现在显示了给定 Python 3.10 的 API 的正确使用。我仍然推荐 @dano 的答案,以更广泛地了解它是如何工作的。

最佳答案

是的,在事件循环中运行的任何协程都会阻止其他协程和任务运行,除非它

  1. 使用 yield from 调用另一个协程或 await (如果使用 Python 3.5+)。
  2. 返回。

这是因为 asyncio是单线程的;事件循环运行的唯一方法是没有其他协程主动执行。使用 yield from/await暂时挂起协程,让事件循环有机会工作。

您的示例代码很好,但在许多情况下,您可能不希望在事件循环中运行不执行异步 I/O 的长时间运行代码。在这些情况下,使用 asyncio.loop.run_in_executor 通常更有意义。在后台线程或进程中运行代码。 ProcessPoolExecutor如果您的任务受 CPU 限制,这将是更好的选择,ThreadPoolExecutor如果您需要执行一些不是 asyncio 的 I/O,将使用-友好。

例如,您的两个循环完全受 CPU 限制并且不共享任何状态,因此最佳性能来自使用 ProcessPoolExecutor跨 CPU 并行运行每个循环:

import asyncio
from concurrent.futures import ProcessPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while True:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while True:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ProcessPoolExecutor(2)
    loop = asyncio.new_event_loop()
    boo = loop.run_in_executor(executor, say_boo)
    baa = loop.run_in_executor(executor, say_baa)

    loop.run_forever()

关于python - 如何使用 python 的 asyncio 模块正确创建和运行并发任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29269370/

相关文章:

python - 需要帮助创建具有两个参数的输入函数

java - 我的ConcurrentHashmap值类型是List,如何使附加到该列表线程安全?

python - master/worker 是否可扩展?

c# - 在 c# 中以异步方式启动多个 "TASK"s

Ruby rake 打印参数,但不喜欢它在 methodcallll 上的值

python - Tkinter/Python 和 Arduino

python - matplotlib 字体与 LaTeX 字体不匹配

javascript - Accordion 表 Django 模板

multithreading - PetraVM Jinx Beta 1 好用吗?

c# - .NET 任务中的代码最终是否通过 native Windows 线程在 CPU 或 CPU 核心上运行?