python - 异步 python itertools 链接多个生成器

标签 python python-3.x asynchronous python-asyncio sequence-generators

更新的清晰度问题:
假设我有 2 个处理生成器函数:

def gen1(): # just for examples,
  yield 1   # yields actually carry 
  yield 2   # different computation weight 
  yield 3   # in my case

def gen2():
  yield 4
  yield 5
  yield 6
我可以用 itertools 链接它们
from itertools import chain

mix = chain(gen1(), gen2())
然后我可以用它创建另一个生成器函数对象,
def mix_yield():
   for item in mix:
      yield item
或者只是如果我只想 next(mix) , 在那。
我的问题是,如何在异步代码中执行等效操作?
因为我需要它:
  • yield 返回(一一),或与 next迭代器
  • 最快解析的产量优先(异步)

  • 上一篇更新:
    经过试验和研究,我找到了aiostream声明为 itertools 的异步版本的库,所以我做了什么:
    import asyncio
    from aiostream import stream
    
    async def gen1(): 
         await asyncio.sleep(0) 
         yield 1 
         await asyncio.sleep(0) 
         yield 2 
         await asyncio.sleep(0) 
         yield 3 
    
    async def gen2(): 
         await asyncio.sleep(0) 
         yield 4 
         await asyncio.sleep(0) 
         yield 5 
         await asyncio.sleep(0) 
         yield 6 
    
    a_mix = stream.combine.merge(gen1(),gen2())
    
    async def a_mix_yield():
       for item in a_mix:
          yield item
    
    但我还是做不到next(a_mix)
    TypeError: 'merge' object is not an iterator
    
    next(await a_mix)
    raise StreamEmpty()
    
    虽然我仍然可以把它做成一个列表:
    print(await stream.list(a_mix))
    # [1, 2, 4, 3, 5, 6]
    
    所以一个目标完成了,还有一个目标:
  • yield 返回(一一),或用 next迭代器
    - 最快的解决产量优先(异步)
  • 最佳答案

    Python的 next 内置函数只是调用底层的一种便捷方式 __next__对象上的方法。异步等效于 __next____anext__异步迭代器上的方法。没有anext全局函数,但可以很容易地编写它:

    async def anext(aiterator):
        return await aiterator.__anext__()
    

    但是节省的空间很小,在极少数情况下需要这样做时,还可以调用 __anext__直接地。异步迭代器又通过调用 __aiter__ 从异步迭代器中获得。 (类似于由常规迭代提供的 __iter__)。手动驱动的异步迭代如下所示:
    a_iterator = obj.__aiter__()          # regular method
    elem1 = await a_iterator.__anext__()  # async method
    elem2 = await a_iterator.__anext__()  # async method
    ...
    
    __anext__会养StopAsyncIteration当没有更多元素可用时。要循环异步迭代器,应该使用 async for .

    这是一个基于您的代码的可运行示例,同时使用 __anext__async for用完 aiostream.stream.combine.merge 设置的流:
    async def main():
        a_mix = stream.combine.merge(gen1(), gen2())
        async with a_mix.stream() as streamer:
            mix_iter = streamer.__aiter__()    
            print(await mix_iter.__anext__())
            print(await mix_iter.__anext__())
            print('remaining:')
            async for x in mix_iter:
                print(x)
    
    asyncio.get_event_loop().run_until_complete(main())
    

    关于python - 异步 python itertools 链接多个生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53422850/

    相关文章:

    python - Tweepy:TweepError:[{'code':112, 'message': 'You must specify either a list ID or a slug and owner.'}]

    python - django 表没有名为 Exception 的列

    c# - 为什么异步代码在 C# 的工作线程上运行

    python - Python 中的字典

    urls.py 中的 Django2 AttributeError

    javascript - AJAX 异步响应回调

    multithreading - 同步 HTTP 请求的最大合理超时

    python - 如何在两个数据类之间绘制分隔线?

    python - Python 中 Kivy 小部件之间的交互

    python - 修改前序树遍历为Python结构