python - 从 Python 3 中的异步生成器继续

标签 python python-3.x python-asyncio

给定一个常规生成器,您可以从中获取一个迭代器,该迭代器只能使用一次并从上次中断的地方继续。像这样 -

sync_gen = (i in range(10))
def fetch_batch_sync(num_tasks, job_list):
    for i, job in enumerate(job_list):
        yield job
        if i == num_tasks - 1:
            break
>>> sync_gen_iter = sync_gen.__iter__()
>>> for i in fetch_batch_sync(2, sync_gen_iter):
...     print i
... 
0
1
>>> for i in fetch_batch_sync(3, sync_gen_iter):
...     print i
... 
2
3
4

有没有办法对异步生成器执行相同的操作?

async def fetch_batch_async(num_tasks, job_list_iter):
    async for i, job in enumerate(job_list_iter):
        yield job
        if i == num_tasks - 1:
            break

最佳答案

常规生成器和异步生成器之间的唯一区别在于,异步生成器的 __next____iter__ 方法本身是异步的。这就是为什么普通的 forenumerate 无法将它们识别为可迭代对象。

与常规生成器一样,可以从异步生成器中提取值的子集,但您需要使用适当的工具。 fetch_batch_async 已经使用了 async for,但它还应该使用 enemuerate 的异步版本;例如:

async def aenumerate(aiterable, start=0):
    i = start
    async for obj in aiterable:
        yield i, obj
        i += 1

fetch_batch_async 会像 enumerate 一样使用它:

async def fetch_batch_async(num_tasks, job_list_iter):
    async for i, job in aenumerate(job_list_iter):
        yield job
        if i == num_tasks - 1:
            break

最后,此代码使用 fetch_batch_async 从无限异步迭代器中提取多个项目:

import asyncio, time

async def infinite():
    while True:
        yield time.time()
        await asyncio.sleep(.1)

async def main():
    async for received in fetch_batch_async(10, infinite()):
        print(received)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

关于python - 从 Python 3 中的异步生成器继续,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48934171/

相关文章:

python - 如果我将默认设置为 None 可以省略 Optional 吗?

python - 根据子元素的属性使用 XSLT 或 Python 对 XML 文件进行排序

python - 在 Python 类的方法中线程化两个函数

python - 从类定义中的列表理解访问类变量

python - 无法使用Python中的字典转换替换列表中的值

python - 如何捕获并发.futures._base.TimeoutError

python - 异步队列收集协程的返回值

python - 调用异步函数时如何防止上下文切换?

python - 在python中实现协程

python - 如何停止在整数后面显示小数点 .0? Python