正在替换我使用 ThreadPoolExecutors
的服务器查询工具的实现所有使用 asyncio
和 aiohttp
的异步调用。由于网络调用是非阻塞 IO,因此大部分转换都是直截了当的,这是让我陷入困境的响应的保存。
我使用的所有示例,甚至是两个库的文档,都使用 asyncio.gather()
来收集所有可等待的结果。在我的例子中,这些结果可以是许多 GB 范围内的文件,我不想将它们存储在内存中。
解决这个问题的适当方法是什么?是否使用asyncio.as_completed()
然后:
for f in as_completed(aws):
earliest_result = await f
# Assumes `loop` defined under `if __name__` block outside coroutine
loop = get_event_loop()
# Run the blocking IO in an exectuor and write to file
_ = await loop.run_in_executor(None, save_result, earliest_result)
这不会引入一个线程(假设我默认使用 ThreadPoolExecutor
)从而使它成为一个异步的多线程程序而不是一个异步的单线程程序吗?
此外,这是否确保任何时候只有 1 个 earliest_result
被写入文件?我不希望调用 await loop.run_in_executor(...)
运行,然后出现另一个结果,我尝试运行到同一个文件;我想我可以用信号量来限制。
最佳答案
我建议使用 aiohttp Streaming API .将您的响应直接写入磁盘而不是 RAM,并返回文件名而不是来自 gather 的响应本身。这样做根本不会使用大量内存。这是我的意思的一个小演示:
import asyncio
import aiofiles
from aiohttp import ClientSession
async def make_request(session, url):
response = await session.request(method="GET", url=url)
filename = url.split('/')[-1]
async for data in response.content.iter_chunked(1024):
async with aiofiles.open(filename, "ba") as f:
await f.write(data)
return filename
async def main():
urls = ['https://github.com/Tinche/aiofiles',
'https://github.com/aio-libs/aiohttp']
async with ClientSession() as session:
coros = [make_request(session, url) for url in urls]
result_files = await asyncio.gather(*coros)
print(result_files)
asyncio.run(main())
关于python - 在异步程序中将 Web 响应写入文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55054965/