python - 调用 thread.join() 会阻塞异步上下文中的事件循环吗?

标签 python multithreading python-asyncio

我正在使用 aiohttp 实现网络 API , 使用 gunicorn 部署启用 UVloop --worker-class aiohttp.GunicornUVLoopWebWorker。因此,我的代码总是在异步上下文中运行。我有在处理请求时实现并行作业以获得更好性能的想法。

我没有使用 asyncio 因为我想要 Parallelism , 不是 Concurrency .

我知道 multiprocessingGIL problem在 python 。但加入进程也适用于我的问题。

这是一个例子:

from aiohttp.web import middleware

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['threads'] = []
    ret = await handler(request)
    for thread in request.context['threads']:
        thread.join()
    return ret

考虑到 thread.join()process.join() 会阻塞当前线程,这会阻塞事件循环(据我所知).如何异步加入?我想要的可以形象地表示为:await thread.join()await process.join()

更新:

感谢@user4815162342,我能够为我的项目编写正确的代码:

中间件:

from aiohttp.web import middleware
from util.process_session import ProcessSession

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['process_session'] = ProcessSession()
    request.context['processes'] = {}
    ret = await handler(request)
    await request.context['process_session'].wait_for_all()
    return ret

用途:

import asyncio
import concurrent.futures
from functools import partial

class ProcessSession():

    def __init__(self):
        self.loop = asyncio.get_running_loop()
        self.pool = concurrent.futures.ProcessPoolExecutor()
        self.futures = []

    async def wait_for_all(self):
        await asyncio.wait(self.futures)

    def add_process(self, f, *args, **kwargs):
        ret = self.loop.run_in_executor(self.pool, partial(f, *args, **kwargs))
        self.futures.append(ret)
        return ret

class ProcessBase():

    def __init__(self, process_session, f, *args, **kwargs):
        self.future = process_session.add_process(f, *args, **kwargs)

    async def wait(self):
        await asyncio.wait([self.future])
        return self.future.result()

最佳答案

回答你的问题:是的,它确实阻止了事件循环。

我发现 ThreadPoolExecutor 在这种情况下工作得很好。

from util.process_session import ProcessSession
from concurrent.futures.thread import ThreadPoolExecutor
import asyncio

from aiohttp.web import middleware

@middleware
async def context_init(request, handler):
    request.context = {}
    request.context['threads'] = []
    ret = await handler(request)
    with ThreadPoolExecutor(1) as executor:
           await asyncio.get_event_loop().run_in_executor(executor, 
           functools.partial(join_threads, data={
             'threads': request.context['threads']
           }))
    return ret

def join_threads(threads):
    for t in threads:
        t.join()

关于python - 调用 thread.join() 会阻塞异步上下文中的事件循环吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53123981/

相关文章:

PYTHON:if not i.find ("Hey") 和 if i.find ("Hey") 之间的区别 == -1

python - 如何在基于 Alpine 镜像的 Docker 化应用程序中启用 WebSocket 连接(WebSocketAddressException : [Errno -3])?

java - 如何以多线程方式调用返回List元素的方法

Android - 线程与 AlarmManager

python - 在嵌套循环中使用 asyncio nested_future() 和 gather()

python - `asyncio.wait([asyncio.sleep(5)])` 和 `asyncio.sleep(5)` 之间的区别

python - 如何为 480p 和 1080p 视频获得类似于 youtube 的 ffmpeg 配置?我有一个功能,但输出质量太低

python - 从 Python 中的字符串中删除奇怪的隐藏字符

java - 使用Jedis 2.8.0对吞吐量进行基准测试的Redis Cluster的可伸缩性

python-3.x - 同步调用协程