python - AsyncGenerator 上的异步 for 循环

标签 python loops for-loop asynchronous python-asyncio

有了一个异步生成器,我希望能够异步地迭代它。但是,我遗漏了一些东西或弄乱了一些东西或两者兼而有之,因为我最终得到了一个常规的同步 for 循环:

import asyncio


async def time_consuming(t):
    print(f"Going to sleep for {t} seconds")
    await asyncio.sleep(t)
    print(f"Slept {t} seconds")
    return t


async def generator():
    for i in range(4, 0, -1):
        yield await time_consuming(i)


async def consumer():
    async for t in generator():
        print(f"Doing something with {t}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    loop.run_until_complete(consumer())
    loop.close()

这将需要大约 12 秒的时间来运行并返回:

Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1

虽然我预计它需要大约 4 秒才能运行并返回如下内容:

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1

最佳答案

异步生成器并不意味着您同时执行迭代!您所获得的只是让协程有更多的地方让步给其他任务。迭代步骤仍然连续运行

换句话说:异步迭代器适用于需要使用 I/O 获取每个迭代步骤的迭代器。考虑循环网络套接字的结果或文件中的行。如果迭代器上的每个 next() 步都需要等待慢速 I/O 源提供数据,那么这是将控制权交给已设置为同时运行的其他事物的好点。

如果您希望生成器的每个单独步骤同时运行,那么您仍然需要使用事件循环显式安排额外的任务。

当所有这些额外任务完成后,您就可以从生成器返回。如果您将 4 个 time_consuming() 协程安排为任务,请使用 asyncio.wait()等待一个或所有任务完成,并从已完成的任务中产生结果,然后是的,在您的 for i in range(...): 循环完成后,您的过程将总共只需要 4 秒:

async def generator():
    pending = []
    for i in range(4, 0, -1):
        pending.append(asyncio.create_task(time_consuming(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            yield task.result()

此时输出变为

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
Slept 2 seconds
Doing something with 2
Slept 3 seconds
Doing something with 3
Slept 4 seconds
Doing something with 4

请注意,这是您预期输出的相反顺序,因为这会在任务完成时获取任务结果,而不是等待创建的第一个任务完成。通常这就是你想要的,真的。当您在 1 秒后已经准备好结果时,为什么还要等待 4 秒?

你也可以有你的变体,某种程度上,但你只是用不同的方式编码。然后你可以使用 asyncio.gather() on the 4 tasks ,它安排一堆协程作为并发任务运行,并将它们的结果作为列表返回,之后你可以产生这些结果:

async def generator():
    tasks = []
    for i in range(4, 0, -1):
        tasks.append(time_consuming(i))

    for res in await asyncio.gather(*tasks):
        yield res 

但是现在输出变成了

Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Slept 2 seconds
Slept 3 seconds
Slept 4 seconds
Doing something with 4
Doing something with 3
Doing something with 2
Doing something with 1

因为在最长的任务 time_consuming(4) 完成之前我们不能做任何进一步的事情,但是运行时间较短的任务在此之前完成并且已经输出它们的 Slept 。 .. 秒 消息。

关于python - AsyncGenerator 上的异步 for 循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52856569/

相关文章:

c++ - 如果(!getline(std::cin,str1))中断;这个条件检查什么?

Javascript - 从数组中查找所有类,从类中获取并返回内部文本

python - 使用 matplotlib 创建表,使用 float 创建列,使用字符串创建其他列?

python - Python类变量是静态的吗?

ruby - 是否保证 Ruby 哈希文字的顺序?

objective-c - 变量变大,但结果却是最小的数?

javascript - "place"在此 for 循环中意味着什么

r - 在 R 中使函数可循环

python - if 语句后的 "UnboundLocalError: local variable referenced before assignment"

python - Tensorflow 2.0具体函数structured_input_signature返回值