我正在寻找在来自 concurrent.futures 的线程池执行程序中运行的异步任务和方法/函数之间通信的最佳解决方案。在之前的同步项目中,我会使用 queue.Queue
类(class)。我假设任何方法都应该是线程安全的,因此 asyncio.queue
不管用。
我见过有人扩展 queue.Queue
类做类似的事情:
class async_queue(Queue):
async def aput(self, item):
self.put_nowait(item)
async def aget(self):
resp = await asyncio.get_event_loop().run_in_executor( None, self.get )
return resp
有没有更好的办法?
最佳答案
我建议反过来:使用 asyncio.Queue
类在两个世界之间进行交流。这样做的好处是不必在线程池中花费一个槽来处理需要很长时间才能完成的操作,例如 get()
.
下面是一个例子:
class Queue:
def __init__(self):
self._loop = asyncio.get_running_loop()
self._queue = asyncio.Queue()
def sync_put_nowait(self, item):
self._loop.call_soon(self._queue.put_nowait, item)
def sync_put(self, item):
asyncio.run_coroutine_threadsafe(self._queue.put(item), self._loop).result()
def sync_get(self):
return asyncio.run_coroutine_threadsafe(self._queue.get(item), self._loop).result()
def async_put_nowait(self, item):
self._queue.put_nowait(item)
async def async_put(self, item):
await self._queue.put(item)
async def async_get(self):
return await self._queue.get()
以
sync_
为前缀的方法旨在由同步代码调用(在事件循环线程之外运行)。前缀为 async_
的那些由在事件循环线程中运行的代码调用,无论它们是否实际上是协程。 (例如 put_nowait
不是协程,但它仍然必须区分同步和异步版本。)
关于python - python中异步任务和同步线程之间的通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59650243/