我有一个异步 websockets 监听器。该监听器从同步主循环传递消息。
我想让异步 websockets 监听器知道有一条新消息要发送。
目前我使用轮询循环(坏)实现了这一点。我尝试使用 cond.notify_all() 但不能在异步代码之外使用?
代码片段:
ws_data = {}
ws_data_lock = threading.Lock()
async def ws_serve(websocket, path):
global ws_data
global ws_data_lock
listen_pair = await websocket.recv()
p_fen = None
while True:
send = None
with ws_data_lock:
if p_fen == None or ws_data[listen_pair] != p_fen:
send = p_fen = ws_data[listen_pair]
if send:
await websocket.send(send)
await asyncio.sleep(0.25)
...
def run_websockets_server():
start_server = websockets.serve(ws_serve, ws_interface, ws_port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
def client_listener():
while True:
with ws_data_lock:
ws_data[pair_id] = (p1_user, p2_user, time.time())
# here I would like to let all websocket listeners know that
# there's new data
t = threading.Thread(target=client_listener)
t.start()
run_websockets_server()
最佳答案
我的第一 react 是:转向单一并发模型。要么始终使用线程,要么始终使用协程(有限地使用线程池来处理 asyncio 无法完成的事情)。
您的项目没有充分理由尝试混合这两种模型。我怀疑你开始使用 asyncio 是因为 Python websockets
library在已经选择线程之后。项目的其余部分也可以使用协程构建(例如使用 aiomysql 来处理数据库连接等)。
但是,你仍然可以将这两种模型结合起来,但是你需要研究 how to use it in combination with threads 上的 asyncio 文档。 .具体来说,要将信息从线程发送到您的协程,您需要使用这两个函数:
asyncio.run_coroutine_threadsafe(coro, loop)
允许您将协程添加到正在运行的循环中,并使用 Future
object 监视该协程如果您需要返回任何东西或需要能够取消例程。 loop.call_soon_threadsafe(callback, *args)
允许您在与循环相同的线程中调用同步函数。这对于从另一个线程调用的回调很有用(例如,您可以在 asyncio.Future()
object 上有一个协程等待,并有一个回调函数在该 future 对象上设置结果,因此将结果传递给协程)。 在您的情况下,如果您想将数据发送到所有当前的 websocket 连接,我会使用:
ws_serve
任务。 ws_serve
任务将它们自己的队列添加到这个映射中,并在它们自己之后进行清理。任务然后从他们自己的队列中选择要发送的项目。 asyncio.run_coroutine_threadsafe()
执行添加到队列的协程。 这里不需要使用锁定;协程的并发问题要少得多,只要没有
await
,协程修改字典就不是问题。 s 在操作期间(包括对所有队列的迭代)。如果将 queues 字典封装在上下文管理器中,则可以更轻松地确保正确清理队列:
# asyncio section, no thread access
import asyncio
from contextlib import AbstractContextManager
class WSSendQueues(AbstractContextManager):
def __init__(self):
self._queues = {}
async def send_to_all(self, item):
for queue in self._queues. values():
queue.put_nowait(item)
def __enter__(self):
task = asyncio.current_task()
self._queues[task] = queue = asyncio.Queue()
return queue
def __exit__(self, exc_type, exc_value, traceback):
task = asyncio.current_task()
self._queues.pop(task, None)
# global instance of the queues manager
# this has a coroutine `send_to_all()`
ws_queues = WSSendQueues()
def ws_serve(websocket, path):
with ws_queues as queue:
listen_pair = await websocket.recv()
while True:
to_send = await queue.get() # blocks until something is available
try:
await websocket.send(to_send)
finally:
# let the queue know we handled the item
queue.task_done()
def run_websockets_server(loop):
start_server = websockets.serve(ws_serve, ws_interface, ws_port)
loop.run_until_complete(start_server)
loop.run_forever()
# reference to the asyncio loop *used for the main thread*
main_thread_loop = asyncio.get_event_loop()
# threads section, need access to the main_thread_loop to schedule
# coroutines
def client_listener():
while True:
# create the coroutine. THIS DOESN'T RUN IT YET.
coro = ws_queues.send_to_all((p1_user, p2_user, time.time()))
# and schedule it to run on the loop. From here on the
# websockets will automatically receive the data on their respective queues.
asyncio.run_coroutine_threadsafe(coro, main_thread_loop)
# starting the threads and event loop
t = threading.Thread(target=client_listener)
t.start()
run_websockets_server(main_thread_loop)
您的代码尚未处理关闭,但我确实准备了上述内容以允许正常关闭 asyncio websocket。
您开始时不再向队列添加数据,因此关闭向队列添加数据的线程。然后你想等待所有
Queue.join()
coroutines这样您就知道所有套接字都已完成将数据发送出去。我会为此添加一个超时,在这里永远等待没有意义。您可以将其设为上下文管理器上的协程:async def join(self, timeout=None):
"""Wait for all the websocket queues to be empty
If timeout is not none, limit the amount of time to wait.
"""
tasks = [asyncio.create_task(q.join()) for q in self._queues.values()]
done, pending = asyncio.wait(tasks, timeout=timeout)
# cancel any remaining joins
for task in pending:
task.cancel()
一旦你在队列中等待(最好有时间限制),你会 shut down the websockets server并关闭循环。当然,所有这些都是通过您在主线程上安排的协程完成的。
关于python - 如何从同步例程通知异步例程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58642559/