python - 如何将异步生成器合并到 python 3.5+ 中的普通生成器中

标签 python multithreading asynchronous python-asyncio aiohttp

我在组合异步生成器和实际运行它们时遇到了问题。这是因为我发现运行它们的唯一方法是通过一个返回可迭代对象而不是生成器的事件循环。让我用一个简单的例子来说明这一点:

假设我有一个 google_search 函数,它通过抓取来搜索谷歌(我不是故意使用 API)。它接受一个搜索字符串并返回一个搜索结果生成器。这个生成器不会在页面结束时结束,函数会继续到下一页。因此 google_search 函数返回一个可能几乎无穷无尽的生成器(从技术上讲,它总是会结束,但通常您可以在 google 上搜索数百万次)

def google_search(search_string):
    # Basically uses requests/aiohttp and beautifulsoup
    # to parse the resulting html and yield search results
    # Assume this function works
    ......

好的,现在我想创建一个函数,允许我迭代多个 google_search 生成器。我想要这样的东西:

def google_searches(*search_strings):
    for results in zip(google_search(query) for query in search_strings):
        yield results

这样我就可以使用一个简单的 for 循环来展开 google_searches 并获得我的结果。上面的代码运行良好,但对于任何相当大数量的搜索来说都非常慢。该代码发送第一次搜索请求,然后是第二次搜索,依此类推,直到最后产生结果。我想加快速度(很多)。我的第一个想法是将 google_searches 更改为异步函数(我使用的是 python 3.6.3 并且可以使用 await/async 等)。然后这会创建一个异步生成器,这很好,但我只能在另一个异步函数或事件循环中运行它。并使用 run_until_complete(loop.gather(...)) 在事件循环中运行它返回结果列表而不是普通生成器,这违背了目的,因为可能有太多搜索结果无法保存在列表中。

我怎样才能通过异步执行请求来加快 google_searches 的功能(最好使用异步代码,但欢迎使用任何代码),同时仍然让它成为一个普通的生成器? 提前致谢!

最佳答案

接受的答案在再次调用生成器之前等待每个异步生成器的一个结果。如果数据没有以完全相同的速度出现,那可能是个问题。下面的解决方案采用多个异步迭代器(生成器或非生成器)并在多个协程中同时迭代所有这些迭代器。每个协程将结果放在一个 asyncio.Queue 中,然后由客户端代码对其进行迭代:

迭代器代码:

import asyncio
from async_timeout import timeout

class MergeAsyncIterator:
    def __init__(self, *it, timeout=60, maxsize=0):
        self._it = [self.iter_coro(i) for i in it]
        self.timeout = timeout
        self._futures = []
        self._queue = asyncio.Queue(maxsize=maxsize)

    def __aiter__(self):
        for it in self._it:
            f = asyncio.ensure_future(it)
            self._futures.append(f)
        return self

    async def __anext__(self):
        if all(f.done() for f in self._futures) and self._queue.empty():
            raise StopAsyncIteration
        with timeout(self.timeout):
            try:
                return await self._queue.get()
            except asyncio.CancelledError:
                raise StopAsyncIteration

    def iter_coro(self, it):
        if not hasattr(it, '__aiter__'):
            raise ValueError('Object passed must be an AsyncIterable')
        return self.aiter_to_queue(it)

    async def aiter_to_queue(self, ait):
        async for i in ait:
            await self._queue.put(i)
            await asyncio.sleep(0)

示例客户端代码:

import random
import asyncio
from datetime import datetime

async def myaiter(name):
    for i in range(5):
        n = random.randint(0, 3)
        await asyncio.sleep(0.1 + n)
        yield (name, n)
    yield (name, 'DONE')

async def main():
    aiters = [myaiter(i) for i in 'abc']
    async for i in MergeAsyncIterator(*aiters, timeout=3):
        print(datetime.now().strftime('%H:%M:%S.%f'), i)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

输出:

14:48:28.638975 ('a', 1)
14:48:29.638822 ('b', 2)
14:48:29.741651 ('b', 0)
14:48:29.742013 ('a', 1)
14:48:30.639588 ('c', 3)
14:48:31.742705 ('c', 1)
14:48:31.847440 ('b', 2)
14:48:31.847828 ('a', 2)
14:48:31.847960 ('c', 0)
14:48:32.950166 ('c', 1)
14:48:33.948791 ('a', 2)
14:48:34.949339 ('b', 3)
14:48:35.055487 ('c', 2)
14:48:35.055928 ('c', 'DONE')
14:48:36.049977 ('a', 2)
14:48:36.050481 ('a', 'DONE')
14:48:37.050415 ('b', 2)
14:48:37.050966 ('b', 'DONE')

PS:上面的代码使用了async_timeout第三方库。
PS2:aiostream库的作用与上述代码相同,而且还有更多。

关于python - 如何将异步生成器合并到 python 3.5+ 中的普通生成器中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48044544/

相关文章:

python - Gekko 数组与 Numpy 数组的内积

python - 创建一个新列并将其填充到 pandas 的组和条件中

python - Matplotlib动画,移动正方形

multithreading - 公开可取消的任务

c# - 调用和开始调用

python - cygwin 中的 aws cli - 如何清除窗口和 cygwin 样式路径中的差异

c++ - 当接收器忙时 Qt 信号会发生什么?

multithreading - 如何创建可以在 O(1) 中更新(删除/创建/更新)项目并且线程安全的数据结构?

c# - 通过线程经济的可扩展性 : async operations vs. 线程池上的多线程生产者/消费者队列?

javascript - Await 仅在 nodejs 的异步函数中有效