python - 如何从同步例程通知异步例程?

标签 python asynchronous

我有一个异步 websockets 监听器。该监听器从同步主循环传递消息。
我想让异步 websockets 监听器知道有一条新消息要发送。

目前我使用轮询循环(坏)实现了这一点。我尝试使用 cond.notify_all() 但不能在异步代码之外使用?

代码片段:

ws_data = {}
ws_data_lock = threading.Lock()

async def ws_serve(websocket, path):
    global ws_data
    global ws_data_lock

    listen_pair = await websocket.recv()

    p_fen = None

    while True:
        send = None

        with ws_data_lock:
            if p_fen == None or ws_data[listen_pair] != p_fen:
                send = p_fen = ws_data[listen_pair]

        if send:
            await websocket.send(send)

        await asyncio.sleep(0.25)

...
def run_websockets_server():
    start_server = websockets.serve(ws_serve, ws_interface, ws_port)

    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()

def client_listener():
    while True:
        with ws_data_lock:
            ws_data[pair_id] = (p1_user, p2_user, time.time())
            # here I would like to let all websocket listeners know that
            # there's new data

t = threading.Thread(target=client_listener)
t.start()

run_websockets_server()

最佳答案

我的第一 react 是:转向单一并发模型。要么始终使用线程,要么始终使用协程(有限地使用线程池来处理 asyncio 无法完成的事情)。

您的项目没有充分理由尝试混合这两种模型。我怀疑你开始使用 asyncio 是因为 Python websockets library在已经选择线程之后。项目的其余部分也可以使用协程构建(例如使用 aiomysql 来处理数据库连接等)。

但是,你仍然可以将这两种模型结合起来,但是你需要研究 how to use it in combination with threads 上的 asyncio 文档。 .具体来说,要将信息从线程发送到您的协程,您需要使用这两个函数:

  • asyncio.run_coroutine_threadsafe(coro, loop) 允许您将协程添加到正在运行的循环中,并使用 Future object 监视该协程如果您需要返回任何东西或需要能够取消例程。
  • loop.call_soon_threadsafe(callback, *args) 允许您在与循环相同的线程中调用同步函数。这对于从另一个线程调用的回调很有用(例如,您可以在 asyncio.Future() object 上有一个协程等待,并有一个回调函数在该 future 对象上设置结果,因此将结果传递给协程)。

  • 在您的情况下,如果您想将数据发送到所有当前的 websocket 连接,我会使用:
  • 队列映射,每个键都处于事件状态 ws_serve任务。 ws_serve任务将它们自己的队列添加到这个映射中,并在它们自己之后进行清理。任务然后从他们自己的队列中选择要发送的项目。
  • 一个协程,在执行时向所有队列添加信息。
  • 其他线程可以使用asyncio.run_coroutine_threadsafe()执行添加到队列的协程。

  • 这里不需要使用锁定;协程的并发问题要少得多,只要没有 await,协程修改字典就不是问题。 s 在操作期间(包括对所有队列的迭代)。

    如果将 queues 字典封装在上下文管理器中,则可以更轻松地确保正确清理队列:
    # asyncio section, no thread access
    import asyncio
    from contextlib import AbstractContextManager
    
    
    class WSSendQueues(AbstractContextManager):
        def __init__(self):
            self._queues = {}
    
        async def send_to_all(self, item):
            for queue in self._queues. values():
                queue.put_nowait(item)
    
        def __enter__(self):
            task = asyncio.current_task()
            self._queues[task] = queue = asyncio.Queue()
            return queue
    
        def __exit__(self, exc_type, exc_value, traceback):
            task = asyncio.current_task()
            self._queues.pop(task, None)
    
    # global instance of the queues manager
    # this has a coroutine `send_to_all()`
    ws_queues = WSSendQueues()
    
    def ws_serve(websocket, path):
        with ws_queues as queue:
            listen_pair = await websocket.recv()
    
            while True:
                to_send = await queue.get()  # blocks until something is available
                try:
                    await websocket.send(to_send)
                finally:
                    # let the queue know we handled the item
                    queue.task_done()
    
    def run_websockets_server(loop):
        start_server = websockets.serve(ws_serve, ws_interface, ws_port)
    
        loop.run_until_complete(start_server)
        loop.run_forever()
    
    # reference to the asyncio loop *used for the main thread*
    main_thread_loop = asyncio.get_event_loop()
    
    # threads section, need access to the main_thread_loop to schedule
    # coroutines
    
    def client_listener():
        while True:
            # create the coroutine. THIS DOESN'T RUN IT YET.
            coro = ws_queues.send_to_all((p1_user, p2_user, time.time()))
    
            # and schedule it to run on the loop. From here on the
            # websockets will automatically receive the data on their respective queues.
            asyncio.run_coroutine_threadsafe(coro, main_thread_loop)
    
    
    # starting the threads and event loop
    t = threading.Thread(target=client_listener)
    t.start()
    
    run_websockets_server(main_thread_loop)
    

    您的代码尚未处理关闭,但我确实准备了上述内容以允许正常关闭 asyncio websocket。

    您开始时不再向队列添加数据,因此关闭向队列添加数据的线程。然后你想等待所有 Queue.join() coroutines这样您就知道所有套接字都已完成将数据发送出去。我会为此添加一个超时,在这里永远等待没有意义。您可以将其设为上下文管理器上的协程:
    async def join(self, timeout=None):
        """Wait for all the websocket queues to be empty
    
        If timeout is not none, limit the amount of time to wait.
        """
        tasks = [asyncio.create_task(q.join()) for q in self._queues.values()]
        done, pending = asyncio.wait(tasks, timeout=timeout)
        # cancel any remaining joins
        for task in pending:
            task.cancel()
    

    一旦你在队列中等待(最好有时间限制),你会 shut down the websockets server并关闭循环。当然,所有这些都是通过您在主线程上安排的协程完成的。

    关于python - 如何从同步例程通知异步例程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58642559/

    相关文章:

    Python,从矩阵到数组

    python - 如何在 Python 3.7 中向 multiprocessing.connection.Client(..) 添加超时?

    python - 为什么在gensim word2vec中创建了多个模型文件?

    Python Reduce 条件表达式

    javascript - 在javascript中异步播放声音?

    javascript - 等待的 promise 仍在返回 <pending>

    c++ - 检测 grpc 服务器中关闭的客户端连接

    python - 在Python中大规模连接字符串

    c# - Xamarin Android DownloadString 需要很长时间甚至无法工作

    javascript - 哪种方式更方便在无限循环中执行异步函数,并在每次执行之间进行 sleep (Node.js/Vanilla JS)