python - 将 aiohttp 与多处理结合起来

标签 python parallel-processing multiprocessing python-asyncio aiohttp

我正在制作一个脚本,该脚本获取近 20,000 个页面的 HTML 并对其进行解析以仅获取其中的一部分。

我设法使用 asyncio 和 aiohttp 通过异步请求在数据帧中获取 20,000 个页面的内容,但此脚本仍然等待获取所有页面来解析它们。

async def get_request(session, url, params=None):
    async with session.get(url, headers=HEADERS, params=params) as response:
        return await response.text()


async def get_html_from_url(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_request(session, url))
        html_page_response = await asyncio.gather(*tasks)
    return html_page_response


html_pages_list = asyncio_loop.run_until_complete(get_html_from_url(urls))

一旦我获得了每个页面的内容,我就设法使用多处理池来并行解析。

get_whatiwant_from_html(html_content):

    parsed_html = BeautifulSoup(html_content, "html.parser")
    clean = parsed_html.find("div", class_="class").get_text()

    # Some re.subs
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)
    clean = re.sub("", "", clean)  

    return clean


pool = Pool(4)
what_i_want = pool.map(get_whatiwant_from_html, html_content_list)

这段代码异步混合了获取和解析,但我想将多处理集成到其中:

async def process(url, session):
    html = await getRequest(session, url)
    return await get_whatiwant_from_html(html)

async def dispatch(urls):
    async with aiohttp.ClientSession() as session:
        coros = (process(url, session) for url in urls)
        return await asyncio.gather(*coros)

result = asyncio.get_event_loop().run_until_complete(dispatch(urls))

有什么明显的方法可以做到这一点吗?我考虑过创建 4 个进程,每个进程运行异步调用,但实现看起来有点复杂,我想知道是否还有其他方法。

我对 asyncio 和 aiohttp 非常陌生,所以如果您有什么建议我阅读以更好地理解,我会非常高兴。

最佳答案

您可以使用ProcessPoolExecutor .

run_in_executor您可以在主 asyncio 进程中执行 IO。

但是大量的 CPU 计算是在不同的进程中进行的。

async def get_data(session, url, params=None):
    loop = asyncio.get_event_loop()
    async with session.get(url, headers=HEADERS, params=params) as response:
        html = await response.text()
        data = await loop.run_in_executor(None, partial(get_whatiwant_from_html, html))
        return data

async def get_data_from_urls(urls):
    tasks = []
    async with aiohttp.ClientSession() as session:
        for url in urls:
            tasks.append(get_data(session, url))
        result_data = await asyncio.gather(*tasks)
    return result_data

executor = concurrent.futures.ProcessPoolExecutor(max_workers=10)
asyncio_loop.set_default_executor(executor)
results = asyncio_loop.run_until_complete(get_data_from_urls(urls))

关于python - 将 aiohttp 与多处理结合起来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53965948/

相关文章:

python - 在 Python 中进行优化的冒泡排序

python - 导入错误 : cannot import name 'QtCore'

java - 如何更快地从 Selenium WebElement 获取值?

python - 同时在进程中使用 Celery 并在任务中使用 gevent

python - Python 中 IF-ELSE block 的缩进

python - satchmo nginx 重定向到 https,然后重定向到 http,然后返回

r - R:将数据框的每一行转换为一个列表项

multithreading - 了解 Julia 中的多线程行为

c# - 从 Bitmap 类的颜色填充 160x43 字节数组的更快方法

python - 在python多处理中实现Ctrl+C取消