python - 为什么我的异步函数是同步运行Python3.9?

标签 python python-3.x python-asyncio python-multithreading concurrent.futures

我正在尝试使用 asynciofutures 在单独的线程上运行函数。我有一个装饰器,它异步获取长时间运行的函数及其参数并输出其值。不幸的是,这些进程似乎不是异步工作的。

def multiprocess(self, function, executor=None, *args, **kwargs):
    async def run_task(function, *args, **kwargs):
        @functools.wraps(function)
        async def wrap(*args, **kwargs):
            while True:
                execution_runner = executor or self._DEFAULT_POOL_
                executed_job = execution_runner.submit(function, *args, **kwargs)
                print(
                    f"Pending {function.__name__}:",
                    execution_runner._work_queue.qsize(),
                    "jobs",
                )
                print(
                    f"Threads: {function.__name__}:", len(execution_runner._threads)
                )
                future = await asyncio.wrap_future(executed_job)
                return future

        return wrap

    return asyncio.run(run_task(function, *args, **kwargs))

为了调用装饰器,我有两个函数_async_tasktask_function_async_task 包含一个循环,为需要处理的每个文档运行 task_function

@staticmethod
def _async_task(documents):
    processed_docs = asyncio.run(task_function(documents))
    return processed_docs

task_function 处理文档中的每个文档,如下所示,

@multiprocess
async def task_function(documents):
    processed_documents = None
    try:
        for doc in documents:
            processed_documents = process_document(doc)
            print(processed_documents)
    except Exception as err:
        print(err)
    return processed_documents

这不能异步工作的线索是我对多线程装饰器的诊断打印以下内容。

Pending summarise_news: 0 jobs
Threads: summarise_news: 2

由于没有待处理的作业,并且整个过程所需的时间与同步运行的时间一样长,因此它是同步运行的。

最佳答案

我在设置您的代码时遇到了一些问题,但我想我已经找到了答案。

首先,正如 @dano 在评论中提到的,asyncio.run 会阻塞,直到协程运行完成。因此,使用这种方法不会获得任何加速。

我使用了稍微修改过的多进程装饰器

def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task

如您所见,这里没有 asyncio.run,装饰器和内部包装器都是同步的,因为 asyncio.wrap_future 不需要 await.

更新后的multiprocess装饰器现在与process_document函数一起使用。原因是并行化按顺序调用阻塞函数的函数不会获得任何好处。您必须将阻塞函数转换以在执行器中运行。

注意这个虚拟process_document与我所描述的完全一样 - 完全阻塞和同步。

@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")

现在,到最后一点。我们已经通过将 process_document 转换为可在执行器中运行来使其成为异步类型,但您究竟如何调用它仍然很重要。

考虑以下示例:

for doc in documents:
    result = await process_document(doc)
results = await asyncio.gather(*[process_document(doc) for doc in documents])

在前一个中,我们将按顺序等待协程,必须等到一个协程完成后才能启动另一个协程。 在后一个示例中,它们将并行执行,因此它实际上取决于您调用协程执行的准确程度。

这是我使用的完整代码片段:

import asyncio
import concurrent.futures
import time


DEFAULT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)


def multiprocess(executor=None, *args, **kwargs):
    def run_task(function, *args, **kwargs):
        def wrap(*args, **kwargs):

            execution_runner = executor or DEFAULT_EXECUTOR
            executed_job = execution_runner.submit(function, *args, **kwargs)
            print(
                f"Pending {function.__name__}:",
                execution_runner._work_queue.qsize(),
                "jobs",
            )
            print(
                f"Threads: {function.__name__}:", len(execution_runner._threads)
            )
            future = asyncio.wrap_future(executed_job)

            return future

        return wrap
    return run_task


@multiprocess()
def process_document(doc):
    print(f"Processing doc: {doc}...")
    time.sleep(2)
    print(f"Doc {doc} done.")


async def task_function_sequential(documents):
    start = time.time()
    for doc in documents:
        await process_document(doc)

    end = time.time()
    print(f"task_function_sequential took: {end-start}s")


async def task_function_parallel(documents):
    start = time.time()

    jobs = [process_document(doc) for doc in documents]
    await asyncio.gather(*jobs)

    end = time.time()
    print(f"task_function_parallel took: {end-start}s")


async def main():
    documents = [i for i in range(5)]
    await task_function_sequential(documents)
    await task_function_parallel(documents)


asyncio.run(main())

请注意,task_function_parallel 示例仍然需要大约 4 秒,而不是 2 秒,因为线程池限制为 4 个工作线程,并且作业数量为 5,因此最后一个作业将等待让一些 worker 有空。

关于python - 为什么我的异步函数是同步运行Python3.9?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66689161/

相关文章:

python - 如何将提取的数据转换成python字典?

python - 同一图上的条形图/线图,但条形图前面的轴和线图不同

python - 在 2048 中创建一个 shift 函数

python - 即使忽略了 CancelledError,如何取消任务执行?

Python - 使用 pcolormesh 和 basemap 绘图

python - 使用 python-social-auth 在模型中保存 facebook 个人资料图片

python - argparse:将无法识别的选项视为位置?

python - 如何在数学函数中使用获取下一个乘法数字

python-3.7 - python3.7+中是否有任何默认的异步空上下文管理器?

python - 我如何在不事先下载和转换的情况下将音频从 pytube 流式传输到 FFMPEG 和 discord.py