TL; DR
有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?
很长的故事
假设您有两个数据源。一个给您id -> name
映射,另一个给您id -> age
映射。您要计算(name, age) -> number_of_ids_with_that_name_and_age
。
有太多数据无法直接加载,但是两个数据源都支持通过id
进行分页/迭代和排序。
所以你写类似def iterate_names():
for page in get_name_page_numbers():
yield from iterate_name_page(page) # yields (id, name) pairs
和年龄相同,然后遍历iterate_names()
和iterate_ages()
。
怎么了发生的是:
基本上,您在处理数据时不会等待任何请求。
您可以使用asyncio.gather
发送所有请求并等待所有数据,但是然后:
有asyncio.as_completed
,它允许您在获取结果时发送所有请求并处理页面,但是您将使页面混乱,因此您将无法进行处理。
理想情况下,将有一个函数会发出第一个请求,并且随着响应的到来,发出第二个请求并同时产生第一个请求的结果。
那可能吗?
最佳答案
您的问题中发生了很多事情。我会尝试所有这些。
有没有办法等待多个期货,并以给定的顺序完成,从而从中获利?
是。您的代码可以按顺序yield from
或await
任何数量的期货。如果您专门谈论Task
,并且希望这些任务同时执行,则只需将它们分配给循环(在asyncio.ensure_future()
或loop.create_task()
时完成),然后循环就需要运行。
至于按顺序产生它们,您可以在创建任务时首先确定该顺序。在一个简单的示例中,在您开始处理所有任务/功能之前,已经创建了所有任务/功能,可以使用list
存储任务的将来,并最终从列表中拉出:
loop = asyncio.get_event_loop()
tasks_im_waiting_for = []
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing))
tasks_im_waiting_for.append(task)
@asyncio.coroutine
def process_gotten_things(getter_tasks):
for task in getter_tasks:
result = yield from task
print("We got {}".format(result))
loop.run_until_complete(process_gotten_things(tasks_im_waiting_for))
该示例一次只能处理一个结果,但在等待序列中的下一个吸毒者任务完成时,仍将允许任何计划的吸气剂任务继续执行其工作。如果处理顺序无关紧要,并且我们想一次处理多个可能准备就绪的结果,那么我们可以使用
deque
而不是list
,其中不止一个process_gotten_things
任务.pop()
从deque
。如果我们想变得更高级,可以执行as Vincent suggests in a comment to your question并使用asyncio.Queue
代替deque
。使用这样的队列,您可以让生产者将任务添加到与任务处理使用者同时运行的队列中。但是,使用
deque
或Queue
排序期货以进行处理有一个缺点,那就是,与运行处理器任务时一样,您只能同时处理多个期货。您可以在每次排队要处理的新将来时创建一个新的处理器任务,但是此时,此队列成为完全冗余的数据结构,因为 asyncio已经为您提供了一个类似于队列的对象,其中添加的所有内容均得到处理并发:事件循环。 对于我们计划的每个任务,我们还可以计划其处理。修改以上示例:for thing in things_to_get:
getter_task = loop.create_task(get_a_thing_coroutine(thing))
processor_task = loop.create_task(process_gotten_thing(getter_task))
# Tasks are futures; the processor can await the result once started
现在让我们说,我们的getter可能会返回多个事物(类似于您的方案),并且每个事物都需要进行一些处理。这使我进入了另一种异步设计模式:子任务。您的任务可以安排事件循环上的其他任务。当事件循环运行时,您的第一个任务的顺序将保持不变,但是如果其中一个任务最终等待某件事,那么您的一个子任务就有可能在其中开始执行。修改上述方案后,我们可以将循环传递给协程,以便协程可以安排处理其结果的任务:
for thing in things_to_get:
task = loop.create_task(get_a_thing_coroutine(thing, loop))
@asyncio.coroutine
def get_a_thing_coroutine(thing, loop):
results = yield from long_time_database_call(thing)
subtasks = []
for result in results:
subtasks.append(loop.create_task(process_result(result)))
# With subtasks scheduled in the order we like, wait for them
# to finish before we consider THIS task complete.
yield from asyncio.wait(subtasks)
所有这些高级模式都按照您想要的顺序启动任务,但可能会以任何顺序完成处理任务。如果您确实需要以与开始获取结果完全相同的顺序处理结果,则请坚持使用单个处理器从序列中提取结果期货或从
asyncio.Queue
中提取收益。您还将注意到,为了确保任务以可预测的顺序启动,我使用
loop.create_task()
明确地安排了任务。尽管asyncio.gather()
和asyncio.wait()
会很高兴地将协程对象并将它们作为Task
进行调度/包装,但是在撰写本文时,它们在以可预测的顺序进行调度方面存在问题。 See asyncio issue #432。好的,让我们回到您的具体情况。您有两个单独的结果源,这些结果需要通过一个公用键
id
结合在一起。我提到的获取和处理这些东西的模式并不能解决这样的问题,我也不知道完美的模式是什么。我将尽我所能尝试这种方法。# defaultdicts are great for representing knowledge that an interested
# party might want whether or not we have any knowledge to begin with:
from collections import defaultdict
# Let's start with a place to store our end goal:
name_and_age_to_id_count = defaultdict(int)
# Given we're correlating info from two sources, let's make two places to
# store that info, keyed by what we're joining on: id
# When we join correlate this info, only one side might be known, so use a
# Future on both sides to represent data we may or may not have yet.
id_to_age_future = defaultdict(loop.create_future)
id_to_name_future = defaultdict(loop.create_future)
# As soon as we learn the name or age for an id, we can begin processing
# the joint information, but because this information is coming from
# multiple sources we want to process concurrently we need to keep track
# of what ids we've started processing the joint info for.
ids_scheduled_for_processing = set()
@asyncio.coroutine
def process_name_page(page_number):
subtasks = []
for id, name in iterate_name_page(page_number):
name_future = id_to_name_future[id]
name_future.set_result(name)
if id not in ids_scheduled_for_processing:
age_future = id_to_age_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
@asyncio.coroutine
def process_age_page(page_number):
subtasks = []
for id, age in iterate_age_page(page_number):
age_future = id_to_age_future[id]
age_future.set_result(age)
if id not in ids_scheduled_for_processing:
name_future = id_to_name_future[id]
task = loop.create_task(increment_name_age_pair(id, name_future, age_future))
subtasks.append(task)
ids_scheduled_for_processing.add(id)
yield from asyncio.wait(subtasks)
@asyncio.coroutine
def increment_name_age_pair(id, name_future, age_future):
# This will wait until both futures are resolved and let other tasks work in the meantime:
pair = ((yield from name_future), (yield from age_future))
name_and_age_to_id_count[pair] += 1
# If memory is a concern:
ids_scheduled_for_processing.discard(id)
del id_to_age_future[id]
del id_to_name_future[id]
page_processing_tasks = []
# Interleave name and age pages:
for name_page_number, age_page_number in zip_longest(
get_name_page_numbers(),
get_age_page_numbers()
):
# Explicitly schedule it as a task in the order we want because gather
# and wait have non-deterministic scheduling order:
if name_page_number is not None:
page_processing_tasks.append(loop.create_task(process_name_page(name_page_number)))
if age_page_number is not None:
page_processing_tasks.append(loop.create_task(process_age_page(age_page_number)))
loop.run_until_complete(asyncio.wait(page_processing_tasks))
print(name_and_age_to_id_count)
asyncio
可能无法解决您的所有并行处理难题。您提到了“处理”要迭代的每个页面要花很长时间。如果由于要等待服务器的响应而花了很多时间,那么此架构是一种精巧的轻量级方法,可满足您的需求(只需确保使用异步循环感知工具来完成I / O操作)。如果由于Python处理数字或用CPU和内存移动东西而需要永远,那么asyncio的单线程事件循环对您没有多大帮助,因为一次仅发生一次Python操作。在这种情况下,如果您想坚持使用asyncio和子任务模式,则可能要考虑将
loop.run_in_executor
与Python解释器进程池一起使用。您还可以使用带有进程池的 concurrent.futures
library而不是使用asyncio来开发解决方案。注意:您提供的示例生成器可能会使某些人感到困惑,因为它使用
yield from
将生成委派给内部生成器。碰巧的是,异步协程使用相同的表达式等待将来的结果,并告诉循环它可以在需要时运行其他协程的代码。
关于python - Python asyncio:按顺序完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35699601/