python - Python asyncio:按顺序完成

标签 python python-asyncio

TL; DR

有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?

很长的故事

假设您有两个数据源。一个给您id -> name映射,另一个给您id -> age映射。您要计算(name, age) -> number_of_ids_with_that_name_and_age

有太多数据无法直接加载,但是两个数据源都支持通过id 进行分页/迭代和排序。

所以你写类似

def iterate_names():
    for page in get_name_page_numbers():
        yield from iterate_name_page(page)  # yields (id, name) pairs

和年龄相同,然后遍历iterate_names()iterate_ages()

怎么了发生的是:
  • 您请求一页名称和年龄
  • 你得到他们
  • 您将处理数据,直到到达页面末尾为止,例如
  • 您要求另一个年龄的页面
  • 您将处理数据,直到...

  • 基本上,您在处理数据时不会等待任何请求。

    您可以使用asyncio.gather发送所有请求并等待所有数据,但是然后:
  • 当第一页到达时,您仍在等待其他人
  • 内存不足

  • asyncio.as_completed,它允许您在获取结果时发送所有请求并处理页面,但是您将使页面混乱,因此您将无法进行处理。

    理想情况下,将有一个函数会发出第一个请求,并且随着响应的到来,发出第二个请求并同时产生第一个请求的结果。

    那可能吗?

    最佳答案

    您的问题中发生了很多事情。我会尝试所有这些。

    有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?

    是。您的代码可以按顺序yield fromawait任何数量的期货。如果您专门谈论Task,并且希望这些任务同时执行,则只需将它们分配给循环(在asyncio.ensure_future()loop.create_task()时完成),然后循环就需要运行。

    至于按顺序产生它们,您可以在创建任务时首先确定该顺序。在一个简单的示例中,在您开始处理所有任务/功能之前,已经创建了所有任务/功能,可以使用list存储任务的将来,并最终从列表中拉出:

    loop = asyncio.get_event_loop()
    tasks_im_waiting_for = []
    for thing in things_to_get:
        task = loop.create_task(get_a_thing_coroutine(thing))
        tasks_im_waiting_for.append(task)
    
    @asyncio.coroutine
    def process_gotten_things(getter_tasks):
        for task in getter_tasks:
            result = yield from task
            print("We got {}".format(result))
    
    loop.run_until_complete(process_gotten_things(tasks_im_waiting_for))
    

    该示例一次只能处理一个结果,但在等待序列中的下一个吸毒者任务完成时,仍将允许任何计划的吸气剂任务继续执行其工作。如果处理顺序无关紧要,并且我们想一次处理多个可能准备就绪的结果,那么我们可以使用deque而不是list,其中不止一个process_gotten_things任务.pop()deque。如果我们想变得更高级,可以执行as Vincent suggests in a comment to your question并使用asyncio.Queue代替deque。使用这样的队列,您可以让生产者将任务添加到与任务处理使用者同时运行的队列中。

    但是,使用dequeQueue排序期货以进行处理有一个缺点,那就是,与运行处理器任务时一样,您只能同时处理多个期货。您可以在每次排队要处理的新将来时创建一个新的处理器任务,但是此时,此队列成为完全冗余的数据结构,因为 asyncio已经为您提供了一个类似于队列的对象,其中添加的所有内容均得到处理并发:事件循环。 对于我们计划的每个任务,我们还可以计划其处理。修改以上示例:
    for thing in things_to_get:
        getter_task = loop.create_task(get_a_thing_coroutine(thing))
        processor_task = loop.create_task(process_gotten_thing(getter_task))
        # Tasks are futures; the processor can await the result once started
    

    现在让我们说,我们的getter可能会返回多个事物(类似于您的方案),并且每个事物都需要进行一些处理。这使我进入了另一种异步设计模式:子任务。您的任务可以安排事件循环上的其他任务。当事件循环运行时,您的第一个任务的顺序将保持不变,但是如果其中一个任务最终等待某件事,那么您的一个子任务就有可能在其中开始执行。修改上述方案后,我们可以将循环传递给协程,以便协程可以安排处理其结果的任务:
    for thing in things_to_get:
        task = loop.create_task(get_a_thing_coroutine(thing, loop))
    
    @asyncio.coroutine
    def get_a_thing_coroutine(thing, loop):
        results = yield from long_time_database_call(thing)
        subtasks = []
        for result in results:
            subtasks.append(loop.create_task(process_result(result)))
        # With subtasks scheduled in the order we like, wait for them
        # to finish before we consider THIS task complete.
        yield from asyncio.wait(subtasks)
    

    所有这些高级模式都按照您想要的顺序启动任务,但可能会以任何顺序完成处理任务。如果您确实需要以与开始获取结果完全相同的顺序处理结果,则请坚持使用单个处理器从序列中提取结果期货或从asyncio.Queue中提取收益。

    您还将注意到,为了确保任务以可预测的顺序启动,我使用loop.create_task()明确地安排了任务。尽管asyncio.gather()asyncio.wait()会很高兴地将协程对象并将它们作为Task进行调度/包装,但是在撰写本文时,它们在以可预测的顺序进行调度方面存在问题。 See asyncio issue #432

    好的,让我们回到您的具体情况。您有两个单独的结果源,这些结果需要通过一个公用键id结合在一起。我提到的获取和处理这些东西的模式并不能解决这样的问题,我也不知道完美的模式是什么。我将尽我所能尝试这种方法。
  • 我们需要一些对象来维护我们所知道的和到目前为止所做的事情的状态,以便随着知识的增长将其关联起来。
    # defaultdicts are great for representing knowledge that an interested
    # party might want whether or not we have any knowledge to begin with:
    from collections import defaultdict
    # Let's start with a place to store our end goal:
    name_and_age_to_id_count = defaultdict(int)
    
    # Given we're correlating info from two sources, let's make two places to
    # store that info, keyed by what we're joining on: id
    # When we join correlate this info, only one side might be known, so use a
    # Future on both sides to represent data we may or may not have yet.
    id_to_age_future = defaultdict(loop.create_future)
    id_to_name_future = defaultdict(loop.create_future)
    
    # As soon as we learn the name or age for an id, we can begin processing
    # the joint information, but because this information is coming from
    # multiple sources we want to process concurrently we need to keep track
    # of what ids we've started processing the joint info for.
    ids_scheduled_for_processing = set()
    
  • 我们知道我们将通过您提到的迭代器在“页面”中获取此信息,因此让我们从这里开始设计任务:
    @asyncio.coroutine
    def process_name_page(page_number):
        subtasks = []
        for id, name in iterate_name_page(page_number):
            name_future = id_to_name_future[id]
            name_future.set_result(name)
            if id not in ids_scheduled_for_processing:
                age_future = id_to_age_future[id]
                task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
                subtasks.append(task)
                ids_scheduled_for_processing.add(id)
        yield from asyncio.wait(subtasks)
    
    @asyncio.coroutine
    def process_age_page(page_number):
        subtasks = []
        for id, age in iterate_age_page(page_number):
            age_future = id_to_age_future[id]
            age_future.set_result(age)
            if id not in ids_scheduled_for_processing:
                name_future = id_to_name_future[id]
                task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
                subtasks.append(task)
                ids_scheduled_for_processing.add(id)
        yield from asyncio.wait(subtasks)
    
  • 这些协程计划要处理的ID的名称/年龄对-更具体地说,ID的名称和年龄的 future 。一旦开始,处理器将等待两个期货的结果(某种意义上将它们加入)。
    @asyncio.coroutine
    def increment_name_age_pair(id, name_future, age_future):
        # This will wait until both futures are resolved and let other tasks work in the meantime:
        pair = ((yield from name_future), (yield from age_future))
    
        name_and_age_to_id_count[pair] += 1
    
        # If memory is a concern:
        ids_scheduled_for_processing.discard(id)
        del id_to_age_future[id]
        del id_to_name_future[id]
    
  • 好,我们有获取/迭代页面的任务以及处理这些页面中内容的子任务。现在,我们需要实际安排获取这些页面的时间。回到您的问题,我们有两个要提取的数据源,我们希望从它们并行提取。我们假定来自一个的信息的顺序与来自另一个的信息的顺序紧密相关,因此我们在事件循环中对两者的处理进行交织。
    page_processing_tasks = []
    # Interleave name and age pages:
    for name_page_number, age_page_number in zip_longest(
        get_name_page_numbers(),
        get_age_page_numbers()
    ):
        # Explicitly schedule it as a task in the order we want because gather
        # and wait have non-deterministic scheduling order:
        if name_page_number is not None:
            page_processing_tasks.append(loop.create_task(process_name_page(name_page_number)))
        if age_page_number is not None:
            page_processing_tasks.append(loop.create_task(process_age_page(age_page_number)))
    
  • 现在我们已经安排了顶层任务,我们最终可以实际执行以下操作:
    loop.run_until_complete(asyncio.wait(page_processing_tasks))
    print(name_and_age_to_id_count)
    
  • asyncio可能无法解决您的所有并行处理难题。您提到了“处理”要迭代的每个页面要花很长时间。如果由于要等待服务器的响应而花了很多时间,那么此架构是一种精巧的轻量级方法,可满足您的需求(只需确保使用异步循环感知工具来完成I / O操作)。

    如果由于Python处理数字或用CPU和内存移动东西而需要永远,那么asyncio的单线程事件循环对您没有多大帮助,因为一次仅发生一次Python操作。在这种情况下,如果您想坚持使用asyncio和子任务模式,则可能要考虑将 loop.run_in_executor 与Python解释器进程池一起使用。您还可以使用带有进程池的 concurrent.futures library而不是使用asyncio来开发解决方案。

    注意:您提供的示例生成器可能会使某些人感到困惑,因为它使用yield from将生成委派给内部生成器。碰巧的是,异步协程使用相同的表达式等待将来的结果,并告诉循环它可以在需要时运行其他协程的代码。

    关于python - Python asyncio:按顺序完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35699601/

    相关文章:

    python - 最大化并行请求数 (aiohttp)

    python - 保护用户上传的文件 django

    python - 使用自动重建索引将 Series 插入 DataFrame

    python - python 初学者必备的标准库

    python - 在 FastApi 中运行 Asyncio 子进程会导致 NotImplementedError

    python - 根据实例化的是异步实例还是同步实例返回包装器

    python - 使用Python哪个框架创建简单的CRUD网站?

    python - Django ReST Framework - 扩展用户模型不接受 "null=true"

    python - 为什么 asyncio 在没有任何消息的情况下引发 TimeoutError?

    python - 使文件处理代码与 asyncio 兼容