Python Asyncio 任务在没有 gather() 的情况下运行

标签 python python-3.x asynchronous python-asyncio aiohttp

我试图重现并更好地理解 this 中的 TaskPool 示例Cristian Garcia 的博文,我遇到了一个非常有趣的结果。

这是我使用的两个脚本。我用一个随机 sleep 调用换掉了一个实际的网络请求

#task_pool.py
import asyncio

class TaskPool(object):

    def __init__(self, workers):
        self._semaphore = asyncio.Semaphore(workers)
        self._tasks = set()

    async def put(self, coro):
        await self._semaphore.acquire()
        task = asyncio.create_task(coro)
        self._tasks.add(task)
        task.add_done_callback(self._on_task_done)

    def _on_task_done(self, task):
        self._tasks.remove(task)
        self._semaphore.release()

    async def join(self):
        await asyncio.gather(*self._tasks)

    async def __aenter__(self):
        return self

    def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        return self.join()

# main.py
import asyncio
import sys
from task_pool import TaskPool
import random
limit = 3

async def fetch(i):
    timereq = random.randrange(5)
    print("request: {} start, delay: {}".format(i, timereq))
    await asyncio.sleep(timereq)
    print("request: {} end".format(i))
    return (timereq,i)

async def _main(total_requests):
    async with TaskPool(limit) as tasks:
        for i in range(total_requests):
            await tasks.put(fetch(i))

loop = asyncio.get_event_loop()
loop.run_until_complete(_main(int(sys.argv[1])))

在 python 3.7.1 上的命令 main.py 10 产生以下结果。

request: 0 start, delay: 3
request: 1 start, delay: 3
request: 2 start, delay: 3
request: 0 end
request: 1 end
request: 2 end
request: 3 start, delay: 4
request: 4 start, delay: 1
request: 5 start, delay: 0
request: 5 end
request: 6 start, delay: 1
request: 4 end
request: 6 end
request: 7 start, delay: 1
request: 8 start, delay: 4
request: 7 end
aexit triggered
request: 9 start, delay: 1
request: 9 end
request: 3 end
request: 8 end

根据这个结果我有几个问题。

  1. 在上下文管理器退出并触发 __aexit__ 之前,我不希望任务运行,因为这是 asyncio.gather 的唯一触发器。然而,打印语句强烈表明 fetch 作业甚至在 aexit 之前就已经发生了。到底发生了什么事?任务在运行吗?如果是这样,是什么开始了他们?
  2. 与 (1) 相关。为什么上下文管理器在所有作业返回之前退出?
  3. fetch 作业应该返回一个元组。我怎样才能访问这个值?对于基于 Web 的应用程序,我想开发人员可能希望对网站返回的数据进行操作。

非常感谢任何帮助!

最佳答案

  1. 任务在 create_task 后立即开始被称为。

    直接来自文档,第一行:

    Wrap the coro coroutine into a Task and schedule its execution.

  2. 不应该,但是。查看您问题中的代码:

    def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        return self.join()
    

    存在三个问题:

    • 这是一个常规的同步函数。将其更改为 async def 并添加必需的 await 以调用 self.join()。在这里你不调用 join 你只是创建任务但从不运行它。您的 python 肯定会提示您从不等待任务。 绝不能忽略这些警告,因为它们意味着您的程序中出现了严重错误。

      [edit:] 正如 user4815162342 在下面指出的那样,您编写的构造实际上会起作用,尽管可能不是出于预期的原因 — 它起作用是因为通过调用 self 返回的协程函数。 join() 没有等待它会被返回和使用,就好像它是 aexit 自己的一样。你不想要这个,让它异步并等待。

    • 修复后,__aexit__ 将打印“aexit triggered”,然后调用 join,等待任务完成.因此,来自尚未完成的任务的消息将出现在“aexit triggered”消息之后。

    • __aexit__ 的返回值被忽略,除非退出是因为引发了异常。在这种情况下,return True 将吞下异常。放下 return

    所以那部分,固定的:

    async def __aexit__(self, exc_type, exc, tb):
        print("aexit triggered")
        await self.join()
        print("aexit completed")
    
  3. 您的TaskPool 必须使任务的结果可用。设计是你的,Python 不会在幕后施展任何魔法。根据您的情况,join 的一个简单方法是将 gather 的结果存储为任务池的一个属性。

关于Python Asyncio 任务在没有 gather() 的情况下运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53548578/

相关文章:

javascript - AngularJS - 将第 3 方异步加载库包装为服务

python - 如何从文本文档中选择最大长度相等的单词

python - 获取笔记本的详细实时报告/监控

python - 从 for 循环中更新的字典制作可滚动的绘图

django - FormView不执行form_valid()方法

c# - 异步 WebAPI Controller 服务图像 - 挂起的 HTTP 请求

python - 保存混淆矩阵

python - 如何在 Django 1.9 中扩展用户模型?

python - 你如何在 python 中列出所有可能的安排?

javascript - 试图让异步代码在for循环内同步运行