python - 如何在不阻塞主线程的情况下执行 "fire and forget"任务?

标签 python multithreading asynchronous

我想到的是一个非常通用的 BackgroundTask 类,可以在网络服务器或独立脚本中使用,以安排不需要阻塞的任务。

我不想在这里使用任何任务队列(celery、rabbitmq 等),因为我考虑的任务太小且运行速度太快。只是想让他们尽可能地完成。那会是异步方法吗?将它们扔到另一个进程中?

我想出的第一个可行的解决方案:

# Need ParamSpec to get correct type hints in BackgroundTask init
P = ParamSpec("P")


class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
    """

    background_tasks = set()
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                task = asyncio.create_task(self.func(*self.args, **self.kwargs))
                self.background_tasks.add(task)
                print(len(self.background_tasks))
                task.add_done_callback(self.background_tasks.discard)

        # TODO: Create sync task (this will follow a similar pattern)


async def create_background_task(func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
    b = BackgroundTask(func, *args, **kwargs)
    await b()


# Usage:
async def sleep(t):
    time.sleep(t)

await create_background_task(sleep, 5)

我想我这样做错过了重点。如果我将此代码与其他一些异步代码一起运行,那么是的,我将获得性能优势,因为阻塞操作不再阻塞主线程。

我在想我可能需要更像一个单独的进程来处理这样的后台任务而不阻塞主线程(上面的异步代码仍将在主线程上运行)。

有一个单独的线程来处理后台作业是否有意义?就像一个简单的作业队列,但非常轻量级,不需要额外的基础设施?

或者创建一个像上面这样的解决方案有意义吗?

我看到 Starlette 做了这样的事情 (https://github.com/encode/starlette/blob/decc5279335f105837987505e3e477463a996f3e/starlette/background.py#L15),但他们在返回响应后等待后台任务。

这使得他们的解决方案依赖于 Web 服务器设计(即在发送响应后执行操作是可以的)。我想知道我们是否可以构建更通用的东西,您可以在脚本或网络服务器中运行后台任务,而不会牺牲性能。

对异步/并发功能不太熟悉,所以不知道如何比较这些解决方案。似乎是一个有趣的问题!

这是我尝试在另一个进程上执行任务时的想法:


class BackgroundTask(metaclass=ThreadSafeSingleton):
    """Easy way to create a background task that is not dependent on any webserver internals.

    Usage:
        async def sleep(t):
            time.sleep(t)

        BackgroundTask(sleep, 10) <- Creates async task and executes it separately (nonblocking, works with coroutines)
        BackgroundTask(time.sleep, 9) <- Creates async task and executes it separately (nonblocking, works with normal functions)
        BackgroundTask(es.transport.close) <- Probably most common use in our codebase
    """

    background_tasks = set()
    executor = concurrent.futures.ProcessPoolExecutor(max_workers=2)
    lock = threading.Lock()

    def __init__(self, func: typing.Callable[P, typing.Any], *args: P.args, **kwargs: P.kwargs) -> None:
        """Uses singleton instance of BackgroundTask to add a task to the async execution queue.

        Args:
            func (typing.Callable[P, typing.Any]): _description_
        """
        self.func = func
        self.args = args
        self.kwargs = kwargs
        self.is_async = asyncio.iscoroutinefunction(func)

    async def __call__(self) -> None:
        if self.is_async:
            with self.lock:
                loop = asyncio.get_running_loop()
                with self.executor as pool:
                    result = await loop.run_in_executor(
                        pool, functools.partial(self.func, *self.args, **self.kwargs))

最佳答案

你的问题太抽象了,我会尽量给出所有问题的共同答案。

How can I "fire and forget" a task without blocking main thread?

这取决于你所说的忘记是什么意思。

  • 如果您不打算在运行后访问该任务,您可以在并行进程中运行它。
  • 如果主应用程序应该能够访问后台任务,那么您应该有一个事件驱动的架构。在这种情况下,以前称为任务的东西将是服务或微服务。

I don't want to use any task queues (celery, rabbitmq, etc.) here because the tasks I'm thinking of are too small and fast to run. Just want to get them done as out of the way as possible. Would that be an async approach? Throwing them onto another process?

如果它包含循环或其他 CPU 密集型操作,则有权使用子进程。如果任务发出请求(异步)、读取文件、记录到 stdout 或其他 I/O 绑定(bind)操作,那么使用协程或线程是正确的。

Does it make sense to have a separate thread that handles background jobs? Like a simple job queue but very lightweight and does not require additional infrastructure?

我们不能只使用一个线程,因为它可能会被另一个使用 CPU 密集型操作的任务阻塞。相反,我们可以运行一个后台进程并使用管道、队列和事件在进程之间进行通信。遗憾的是,我们无法提供进程之间的复杂对象,但我们可以提供基本的数据结构来处理后台运行的任务的状态变化。

关于StarletteBackgroundTask

Starlette 是一个轻量级的 ASGI 框架/工具包,非常适合用 Python 构建异步 Web 服务。 (自述文件描述)

它基于并发。因此,即使这也不是所有类型任务的通用解决方案。 注意:并发不同于并行。

I'm wondering if we can build something more generic where you can run background tasks in scripts or webservers alike, without sacrificing performance.

上述解决方案建议使用后台进程。尽管如此,这仍取决于应用程序设计,因为您必须执行通信和同步运行进程(任务)所需的操作(发出事件、向队列添加指示符等)。没有通用的工具,但有根据情况的解决方案。

情况 1 - 任务是异步函数

假设我们有一个 request 函数应该调用 API 而不会阻塞其他任务的工作。此外,我们还有一个不应阻塞任何东西的 sleep 函数。

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            try:
                return await response.json()
            except aiohttp.ContentTypeError:
                return await response.read()


async def sleep(t):
    await asyncio.sleep(t)


async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))

    ...  # here we can do even CPU-bound operations

    result1 = await background_task_1

    ...  # use the 'result1', etc.

    await background_task_2


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()

在这种情况下,我们使用 asyncio.create_task同时运行协程(如在后台)。当然我们可以在子进程中运行它,但没有理由这样做,因为它会使用更多资源而不会提高性能。

情况 2 - 任务是同步函数(I/O 绑定(bind))

与函数已经异步的第一种情况不同,在这种情况下,它们是同步的,但不受 CPU 限制(I/O 限制)。这提供了在线程中运行它们或使它们异步(使用 asyncio.to_thread )并同时运行的能力。

import time
import asyncio
import requests


def asynchronous(func):
    """
    This decorator converts a synchronous function to an asynchronous
    
    Usage:
        @asynchronous
        def sleep(t):
            time.sleep(t)
            
        async def main():
            await sleep(5)
    """
    
    async def wrapper(*args, **kwargs):
        await asyncio.to_thread(func, *args, **kwargs)

    return wrapper


@asynchronous
def request(url):
    with requests.Session() as session:
        response = session.get(url)
        try:
            return response.json()
        except requests.JSONDecodeError:
            return response.text


@asynchronous
def sleep(t):
    time.sleep(t)

    
async def main():
    background_task_1 = asyncio.create_task(request("https://google.com/"))
    background_task_2 = asyncio.create_task(sleep(5))
    ...

这里我们使用装饰器将同步(I/O 绑定(bind))函数转换为异步函数,并像第一种情况一样使用它们。

情况 3 - 任务是同步函数(受 CPU 限制)

要在后台并行运行 CPU 密集型任务,我们必须使用多处理。为了确保任务完成,我们使用 join方法。

import time
import multiprocessing


def task():
    for i in range(10):
        time.sleep(0.3)


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    background_task.join()  # wait until the background task is done

    ...  # do stuff that depends on the background task


if __name__ == "__main__":
    main()

假设主应用程序依赖于后台任务的部分。在这种情况下,我们需要一个 event - 驱动设计作为 join 不能被多次调用。

import multiprocessing

event = multiprocessing.Event()


def task():
    ...  # synchronous operations

    event.set()  # notify the main function that the first part of the task is done

    ...  # synchronous operations

    event.set()  # notify the main function that the second part of the task is also done

    ...  # synchronous operations


def main():
    background_task = multiprocessing.Process(target=task)
    background_task.start()

    ...  # do the rest stuff that does not depend on the background task

    event.wait()  # wait until the first part of the background task is done

    ...  # do stuff that depends on the first part of the background task

    event.wait()  # wait until the second part of the background task is done

    ...  # do stuff that depends on the second part of the background task

    background_task.join()  # wait until the background task is finally done

    ...  # do stuff that depends on the whole background task


if __name__ == "__main__":
    main()

正如您已经注意到的事件,我们可以只提供二进制信息,如果进程超过两个,则这些信息无效(不可能知道事件是从哪里发出的)。所以我们使用pipes , queues , 和 manager在进程之间提供非二进制信息。

关于python - 如何在不阻塞主线程的情况下执行 "fire and forget"任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72593019/

相关文章:

python - NameError: 函数名未定义

C - pthread条件变量

c# - Parallel.ForEach 没有执行该方法

ios - 异步数据调用 & CoreAnimation

ios - TableView 和 AFNetworking 异步调用

python - 捕获 Bottle 服务器错误

python - 如何处理 PythonCard 中的 mouseMiddleDrag 事件?

python - 将字典的值从列表转换为 float

ios - GMSThreadException' 原因 : 'The API method must be called from the main thread'

c#:在完成之前经过一定时间后重新启动异步任务