我的应用程序从慢速 i/o 源读取数据,进行一些处理,然后将其写入本地文件。我已经用这样的生成器实现了这个:
import time
def io_task(x):
print("requesting data for input %s" % x)
time.sleep(1) # this simulates a blocking I/O task
return 2*x
def producer(xs):
for x in xs:
yield io_task(x)
def consumer(xs):
with open('output.txt', 'w') as fp:
for x in xs:
print("writing %s" % x)
fp.write(str(x) + '\n')
data = [1,2,3,4,5]
consumer(producer(data))
现在我想在 asyncio 的帮助下并行执行此任务,但我似乎不知道如何操作。对我来说,主要问题是直接通过生成器将数据从生产者提供给消费者,同时让 asyncio 向 io_task(x)
发出多个并行请求。此外,整个 async def
与 @asyncio.coroutine
的对比让我很困惑。
谁能告诉我如何构建一个使用此示例代码中的 asyncio
的最小工作示例?
(注意:不只调用 io_task()
,缓冲结果,然后将它们写入文件。我需要一个有效的解决方案在可能超过主内存的大数据集上,这就是我到目前为止一直使用生成器的原因。但是可以安全地假设消费者总是比所有生产者加起来更快)
最佳答案
自 python 3.6 和 asynchronous generators ,只需进行很少的更改即可使您的代码与 asyncio 兼容。
io_task
函数变成协程:
async def io_task(x):
await asyncio.sleep(1)
return 2*x
producer
生成器变为异步生成器:
async def producer(xs):
for x in xs:
yield await io_task(x)
consumer
函数成为协程并使用 aiofiles 、异步上下文管理和异步迭代:
async def consumer(xs):
async with aiofiles.open('output.txt', 'w') as fp:
async for x in xs:
await fp.write(str(x) + '\n')
主协程在事件循环中运行:
data = [1,2,3,4,5]
main = consumer(producer(data))
loop = asyncio.get_event_loop()
loop.run_until_complete(main)
loop.close()
此外,您可以考虑使用 aiostream在生产者和消费者之间通过管道传输一些处理操作。
编辑:使用 as_completed 可以很容易地在生产者端同时运行不同的 I/O 任务。 :
async def producer(xs):
coros = [io_task(x) for x in xs]
for future in asyncio.as_completed(coros):
yield await future
关于python - 使用 asyncio 并行化生成器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46789093/