python - 如何将异步生成器合并到 python 3.5+ 中的普通生成器中

标签 python multithreading asynchronous python-asyncio aiohttp


假设我有一个 google_search 函数,它通过抓取来搜索谷歌(我不是故意使用 API)。它接受一个搜索字符串并返回一个搜索结果生成器。这个生成器不会在页面结束时结束,函数会继续到下一页。因此 google_search 函数返回一个可能几乎无穷无尽的生成器(从技术上讲,它总是会结束,但通常您可以在 google 上搜索数百万次)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works

好的,现在我想创建一个函数,允许我迭代多个 google_search 生成器。我想要这样的东西:

def google_searches(*search_strings):
    for results in zip(google_search(query) for query in search_strings):
        yield results

这样我就可以使用一个简单的 for 循环来展开 google_searches 并获得我的结果。上面的代码运行良好,但对于任何相当大数量的搜索来说都非常慢。该代码发送第一次搜索请求,然后是第二次搜索,依此类推,直到最后产生结果。我想加快速度(很多)。我的第一个想法是将 google_searches 更改为异步函数(我使用的是 python 3.6.3 并且可以使用 await/async 等)。然后这会创建一个异步生成器,这很好,但我只能在另一个异步函数或事件循环中运行它。并使用 run_until_complete(loop.gather(...)) 在事件循环中运行它返回结果列表而不是普通生成器,这违背了目的,因为可能有太多搜索结果无法保存在列表中。

我怎样才能通过异步执行请求来加快 google_searches 的功能(最好使用异步代码,但欢迎使用任何代码),同时仍然让它成为一个普通的生成器? 提前致谢!


接受的答案在再次调用生成器之前等待每个异步生成器的一个结果。如果数据没有以完全相同的速度出现,那可能是个问题。下面的解决方案采用多个异步迭代器(生成器或非生成器)并在多个协程中同时迭代所有这些迭代器。每个协程将结果放在一个 asyncio.Queue 中,然后由客户端代码对其进行迭代:


import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)


import random
import asyncio
from datetime import datetime

async def myaiter(name):
    for i in range(5):
        n = random.randint(0, 3)
        await asyncio.sleep(0.1 + n)
        yield (name, n)
    yield (name, 'DONE')

async def main():
    aiters = [myaiter(i) for i in 'abc']
    async for i in MergeAsyncIterator(*aiters, timeout=3):
        print('%H:%M:%S.%f'), i)

loop = asyncio.get_event_loop()


14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')


关于python - 如何将异步生成器合并到 python 3.5+ 中的普通生成器中,我们在Stack Overflow上找到一个类似的问题:


python - Gekko 数组与 Numpy 数组的内积

python - 创建一个新列并将其填充到 pandas 的组和条件中

python - Matplotlib动画,移动正方形

multithreading - 公开可取消的任务

c# - 调用和开始调用

python - cygwin 中的 aws cli - 如何清除窗口和 cygwin 样式路径中的差异

c++ - 当接收器忙时 Qt 信号会发生什么?

multithreading - 如何创建可以在 O(1) 中更新(删除/创建/更新)项目并且线程安全的数据结构?

c# - 通过线程经济的可扩展性 : async operations vs. 线程池上的多线程生产者/消费者队列?

javascript - Await 仅在 nodejs 的异步函数中有效