我正在使用 aiohttp 实现网络 API , 使用 gunicorn 部署启用 UVloop --worker-class aiohttp.GunicornUVLoopWebWorker
。因此,我的代码总是在异步上下文中运行。我有在处理请求时实现并行作业以获得更好性能的想法。
我没有使用 asyncio
因为我想要 Parallelism , 不是 Concurrency .
我知道 multiprocessing和 GIL problem在 python 。但加入进程也适用于我的问题。
这是一个例子:
from aiohttp.web import middleware
@middleware
async def context_init(request, handler):
request.context = {}
request.context['threads'] = []
ret = await handler(request)
for thread in request.context['threads']:
thread.join()
return ret
考虑到 thread.join()
或 process.join()
会阻塞当前线程,这会阻塞事件循环(据我所知).如何异步加入?我想要的可以形象地表示为:await thread.join()
或 await process.join()
。
更新:
感谢@user4815162342,我能够为我的项目编写正确的代码:
中间件:
from aiohttp.web import middleware
from util.process_session import ProcessSession
@middleware
async def context_init(request, handler):
request.context = {}
request.context['process_session'] = ProcessSession()
request.context['processes'] = {}
ret = await handler(request)
await request.context['process_session'].wait_for_all()
return ret
用途:
import asyncio
import concurrent.futures
from functools import partial
class ProcessSession():
def __init__(self):
self.loop = asyncio.get_running_loop()
self.pool = concurrent.futures.ProcessPoolExecutor()
self.futures = []
async def wait_for_all(self):
await asyncio.wait(self.futures)
def add_process(self, f, *args, **kwargs):
ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))
self.futures.append(ret)
return ret
class ProcessBase():
def __init__(self, process_session, f, *args, **kwargs):
self.future = process_session.add_process(f, *args, **kwargs)
async def wait(self):
await asyncio.wait([self.future])
return self.future.result()
最佳答案
回答你的问题:是的,它确实阻止了事件循环。
我发现 ThreadPoolExecutor
在这种情况下工作得很好。
from util.process_session import ProcessSession
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio
from aiohttp.web import middleware
@middleware
async def context_init(request, handler):
request.context = {}
request.context['threads'] = []
ret = await handler(request)
with ThreadPoolExecutor(1) as executor:
await asyncio.get_event_loop().run_in_executor(executor,
functools.partial(join_threads, data={
'threads': request.context['threads']
}))
return ret
def join_threads(threads):
for t in threads:
t.join()
关于python - 调用 thread.join() 会阻塞异步上下文中的事件循环吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53123981/