我正在制作一个脚本,该脚本获取近 20,000 个页面的 HTML 并对其进行解析以仅获取其中的一部分。
我设法使用 asyncio 和 aiohttp 通过异步请求在数据帧中获取 20,000 个页面的内容,但此脚本仍然等待获取所有页面来解析它们。
async def get_request(session, url, params=None):
async with session.get(url, headers=HEADERS, params=params) as response:
return await response.text()
async def get_html_from_url(urls):
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
tasks.append(get_request(session, url))
html_page_response = await asyncio.gather(*tasks)
return html_page_response
html_pages_list = asyncio_loop.run_until_complete(get_html_from_url(urls))
一旦我获得了每个页面的内容,我就设法使用多处理池来并行解析。
get_whatiwant_from_html(html_content):
parsed_html = BeautifulSoup(html_content, "html.parser")
clean = parsed_html.find("div", class_="class").get_text()
# Some re.subs
clean = re.sub("", "", clean)
clean = re.sub("", "", clean)
clean = re.sub("", "", clean)
return clean
pool = Pool(4)
what_i_want = pool.map(get_whatiwant_from_html, html_content_list)
这段代码异步混合了获取和解析,但我想将多处理集成到其中:
async def process(url, session):
html = await getRequest(session, url)
return await get_whatiwant_from_html(html)
async def dispatch(urls):
async with aiohttp.ClientSession() as session:
coros = (process(url, session) for url in urls)
return await asyncio.gather(*coros)
result = asyncio.get_event_loop().run_until_complete(dispatch(urls))
有什么明显的方法可以做到这一点吗?我考虑过创建 4 个进程,每个进程运行异步调用,但实现看起来有点复杂,我想知道是否还有其他方法。
我对 asyncio 和 aiohttp 非常陌生,所以如果您有什么建议我阅读以更好地理解,我会非常高兴。
最佳答案
您可以使用ProcessPoolExecutor .
与 run_in_executor您可以在主 asyncio 进程中执行 IO。
但是大量的 CPU 计算是在不同的进程中进行的。
async def get_data(session, url, params=None):
loop = asyncio.get_event_loop()
async with session.get(url, headers=HEADERS, params=params) as response:
html = await response.text()
data = await loop.run_in_executor(None, partial(get_whatiwant_from_html, html))
return data
async def get_data_from_urls(urls):
tasks = []
async with aiohttp.ClientSession() as session:
for url in urls:
tasks.append(get_data(session, url))
result_data = await asyncio.gather(*tasks)
return result_data
executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
asyncio_loop.set_default_executor(executor)
results = asyncio_loop.run_until_complete(get_data_from_urls(urls))
关于python - 将 aiohttp 与多处理结合起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53965948/