我正在尝试使用 asyncio
和 futures
在单独的线程上运行函数。我有一个装饰器,它异步获取长时间运行的函数及其参数并输出其值。不幸的是,这些进程似乎不是异步工作的。
def multiprocess(self, function, executor=None, *args, **kwargs):
async def run_task(function, *args, **kwargs):
@functools.wraps(function)
async def wrap(*args, **kwargs):
while True:
execution_runner = executor or self._DEFAULT_POOL_
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = await asyncio.wrap_future(executed_job)
return future
return wrap
return asyncio.run(run_task(function, *args, **kwargs))
为了调用装饰器,我有两个函数_async_task
和task_function
。 _async_task
包含一个循环,为需要处理的每个文档运行 task_function
。
@staticmethod
def _async_task(documents):
processed_docs = asyncio.run(task_function(documents))
return processed_docs
task_function
处理文档中的每个文档,如下所示,
@multiprocess
async def task_function(documents):
processed_documents = None
try:
for doc in documents:
processed_documents = process_document(doc)
print(processed_documents)
except Exception as err:
print(err)
return processed_documents
这不能异步工作的线索是我对多线程装饰器的诊断打印以下内容。
Pending summarise_news: 0 jobs
Threads: summarise_news: 2
由于没有待处理的作业,并且整个过程所需的时间与同步运行的时间一样长,因此它是同步运行的。
最佳答案
我在设置您的代码时遇到了一些问题,但我想我已经找到了答案。
首先,正如 @dano 在评论中提到的,asyncio.run
会阻塞,直到协程运行完成。因此,使用这种方法不会获得任何加速。
我使用了稍微修改过的多进程
装饰器
def multiprocess(executor=None, *args, **kwargs):
def run_task(function, *args, **kwargs):
def wrap(*args, **kwargs):
execution_runner = executor or DEFAULT_EXECUTOR
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = asyncio.wrap_future(executed_job)
return future
return wrap
return run_task
如您所见,这里没有 asyncio.run
,装饰器和内部包装器都是同步的,因为 asyncio.wrap_future
不需要 await
.
更新后的multiprocess
装饰器现在与process_document
函数一起使用。原因是并行化按顺序调用阻塞函数的函数不会获得任何好处。您必须将阻塞函数转换以在执行器中运行。
注意这个虚拟process_document
与我所描述的完全一样 - 完全阻塞和同步。
@multiprocess()
def process_document(doc):
print(f"Processing doc: {doc}...")
time.sleep(2)
print(f"Doc {doc} done.")
现在,到最后一点。我们已经通过将 process_document
转换为可在执行器中运行来使其成为异步类型,但您究竟如何调用它仍然很重要。
考虑以下示例:
for doc in documents:
result = await process_document(doc)
results = await asyncio.gather(*[process_document(doc) for doc in documents])
在前一个中,我们将按顺序等待协程,必须等到一个协程完成后才能启动另一个协程。 在后一个示例中,它们将并行执行,因此它实际上取决于您调用协程执行的准确程度。
这是我使用的完整代码片段:
import asyncio
import concurrent.futures
import time
DEFAULT_EXECUTOR = concurrent.futures.ThreadPoolExecutor(max_workers=4)
def multiprocess(executor=None, *args, **kwargs):
def run_task(function, *args, **kwargs):
def wrap(*args, **kwargs):
execution_runner = executor or DEFAULT_EXECUTOR
executed_job = execution_runner.submit(function, *args, **kwargs)
print(
f"Pending {function.__name__}:",
execution_runner._work_queue.qsize(),
"jobs",
)
print(
f"Threads: {function.__name__}:", len(execution_runner._threads)
)
future = asyncio.wrap_future(executed_job)
return future
return wrap
return run_task
@multiprocess()
def process_document(doc):
print(f"Processing doc: {doc}...")
time.sleep(2)
print(f"Doc {doc} done.")
async def task_function_sequential(documents):
start = time.time()
for doc in documents:
await process_document(doc)
end = time.time()
print(f"task_function_sequential took: {end-start}s")
async def task_function_parallel(documents):
start = time.time()
jobs = [process_document(doc) for doc in documents]
await asyncio.gather(*jobs)
end = time.time()
print(f"task_function_parallel took: {end-start}s")
async def main():
documents = [i for i in range(5)]
await task_function_sequential(documents)
await task_function_parallel(documents)
asyncio.run(main())
请注意,task_function_parallel
示例仍然需要大约 4 秒,而不是 2 秒,因为线程池限制为 4 个工作线程,并且作业数量为 5,因此最后一个作业将等待让一些 worker 有空。
关于python - 为什么我的异步函数是同步运行Python3.9?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66689161/