python - 在 asyncio 应用程序中将 CPU 密集型任务作为单独的进程运行会导致速度显着减慢

标签 python asynchronous python-asyncio

我有一些 HTML 页面,我试图通过 aiohttpasyncio 使用异步 Web 请求提取文本,提取它们后,我将文件保存在本地。我正在使用 BeautifulSoup(在 extract_text() 下)处理响应中的文本并提取 HTML 页面中的相关文本(排除代码等),但是面临一个问题,即我的同步版本的脚本比我的异步+多处理更快。

据我了解,使用 BeautifulSoup 函数会导致主事件循环在 parse() 内阻塞,因此基于这两个 StackOverflow 问题[ 0 , 1 ],我认为最好的办法是在自己的进程中运行 extract_text() (作为它的 CPU 任务),这应该可以防止事件循环阻塞。

这会导致脚本的运行时间比同步版本(无多重处理)长 1.5 倍

为了确认这不是我的异步代码实现的问题,我删除了 extract_text() 的使用,而是保存了响应对象中的原始文本。这样做使我的异步代码变得更快,表明问题纯粹是由于在单独进程上运行的extract_text()造成的。

我在这里遗漏了一些重要的细节吗?

import asyncio
from asyncio import Semaphore
import json
import logging
from pathlib import Path
from typing import List, Optional

import aiofiles
from aiohttp import ClientSession
import aiohttp
from bs4 import BeautifulSoup
import concurrent.futures
import functools


def extract_text(raw_text: str) -> str:
    return " ".join(BeautifulSoup(raw_text, "html.parser").stripped_strings)


async def fetch_text(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> str:
    async with semaphore:
        response = await session.request(method="GET", url=url, **kwargs)
        response.raise_for_status()
        logging.info("Got response [%s] for URL: %s", response.status, url)
        text = await response.text(encoding="utf-8")
        return text


async def parse(
    url: str,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs,
) -> Optional[str]:
    try:
        text = await fetch_text(
            url=url,
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logging.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
    except Exception as e:
        logging.exception(
            "Non-aiohttp exception occured:  %s",
            getattr(e, "__dict__", None),
        )
    else:
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as pool:
            extract_text_ = functools.partial(extract_text, text)
            text = await loop.run_in_executor(pool, extract_text_)
            logging.info("Found text for %s", url)
            return text


async def process_file(
    url: dict,
    session: ClientSession,
    semaphore: Semaphore,
    **kwargs: dict,
) -> None:
    category = url.get("category")
    link = url.get("link")
    if category and link:
        text = await parse(
            url=f"{URL}/{link}",
            session=session,
            semaphore=semaphore,
            **kwargs,
        )
        if text:
            save_path = await get_save_path(
                link=link,
                category=category,
            )
            await write_file(html_text=text, path=save_path)
        else:
            logging.warning("Text for %s not found, skipping it...", link)


async def process_files(
    html_files: List[dict],
    semaphore: Semaphore,
) -> None:
    async with ClientSession() as session:
        tasks = [
            process_file(
                url=file,
                session=session,
                semaphore=semaphore,
            )
            for file in html_files
        ]
        await asyncio.gather(*tasks)


async def write_file(
    html_text: str,
    path: Path,
) -> None:
    # Write to file using aiofiles
    ...

async def get_save_path(link: str, category: str) -> Path:
    # return path to save
    ...

async def main_async(
    num_files: Optional[int],
    semaphore_count: int,
) -> None:
    html_files = # get all the files to process
    semaphore = Semaphore(semaphore_count)
    await process_files(
        html_files=html_files,
        semaphore=semaphore,
    )


if __name__ == "__main__":
    NUM_FILES = # passed through CLI args
    SEMAPHORE_COUNT = # passed through CLI args
    asyncio.run(
        main_async(
            num_files=NUM_FILES,
            semaphore_count=SEMAPHORE_COUNT,
        )
    )

SnakeViz 图表涵盖 1000 个样本

  1. 具有 extract_text 和多处理功能的异步版本

Async version with extract_text and multiprocessing

  • 不带 extract_text 的异步版本
  • Async version without extract text

  • 与 extract_text 同步版本(注意来自 BeautifulSoup 的 html_parser 如何占用此处的大部分时间)
  • Sync version with extract_text

  • 不带 extract_text 的同步版本
  • enter image description here

    最佳答案

    以下是异步程序的大致功能:

    1. 同时启动 num_files parse() 任务
    2. 每个 parse() 任务都会创建自己的 ProcessPoolExecutor 并异步等待 extract_text(在之前创建的进程池中执行)。

    由于以下几个原因,这并不是最理想的:

    1. 它创建 num_files 个进程池,这些进程池的创建成本很高并且占用内存
    2. 每个池仅用于一个操作,这会适得其反:应将尽可能多的并发操作提交给给定池

    每次调用 parse() 函数时,您都会创建一个新的 ProcessPoolExecutor。您可以尝试实例化它一次(例如作为全局变量,通过函数参数传递):

    from concurrent.futures import ProcessPoolExecutor
    
    async def parse(loop, executor, ...):
      ...
      text = await loop.run_in_executor(executor, extract_text)
    
    # and then in `process_file` (or `process_files`):
    
    async def process_file(...):
      ...
      loop = asyncio.get_running_loop()
      with ProcessPoolExecutor() as executor:
        ...
        await process(loop, executor, ...)
    

    我对在我的旧 MacBook Air 2015 上创建 ProcessPoolExecutor 的开销进行了基准测试,结果表明它非常慢(池创建、打开、提交和关闭几乎需要 100 毫秒):

    from time import perf_counter
    from concurrent.futures import ProcessPoolExecutor
    
    def main_1():
        """Pool crated once"""
        reps = 100
        t1 = perf_counter()
        with ProcessPoolExecutor() as executor:
            for _ in range(reps):
                executor.submit(lambda: None) 
        t2 = perf_counter()   
        print(f"{(t2 - t1) / reps * 1_000} ms")  # 2 ms/it
    
    def main_2():
        """Pool created at each iteration"""
        reps = 100
        t1 = perf_counter()
        for _ in range(reps):
            with ProcessPoolExecutor() as executor:
                executor.submit(lambda: None) 
        t2 = perf_counter()   
        print(f"{(t2 - t1) / reps * 1_000} ms")  # 100 ms/it
    
    if __name__ == "__main__":
        main_1()
        main_2()
    

    您可以再次在 process_files 函数中提升它,这样可以避免为每个文件重新创建池。

    此外,请尝试更仔细地检查您的第一个 SnakeViz 图表,以便了解 process.py:submit 中到底是什么花费了这么多时间。


    最后一件事,请注意在执行器上使用上下文管理器的语义:

    from concurrent.futures import ProcessPoolExecutor
    
    with ProcessPoolExecutor() as executor:
      for i in range(100):
        executor.submit(some_work, i)
    

    这不仅会创建、执行并向其提交工作,而且还会等待所有工作完成,然后再退出 with 语句。

    关于python - 在 asyncio 应用程序中将 CPU 密集型任务作为单独的进程运行会导致速度显着减慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72870683/

    相关文章:

    python - 列表中列表的随机播放功能

    python - 程序计算每个行项目的税率等于增值税

    c++ - 如何有效地使用 boost::process::async_pipe 进行写入和读取?

    python - Queue.asyncio值错误: task_done() called too many times - Coding error or a bug detected?

    python - 如何使用 asyncio 循环遍历不确定长度的迭代器

    python - Stripe API : filter for Payout ID when I have destination ID (bank ID)

    python - 如何在 Python 中获得 XMPP 客户端的响应

    javascript - 让 async/await 与 superagent 一起工作

    JavaScript、异步、锁?

    python - 如何从由于超时而取消的 python asyncio 协程返回值