python-3.x - Python 3 - 多个 AsyncIO 连接

标签 python-3.x chat python-asyncio

我正在尝试学习如何在 Python 3.7 中使用 AsyncIO,但我仍然对其原理有些困惑。

我的目标是编写一个简单的聊天程序,但是我需要使用环形网络拓扑——一个节点只知道它的两个邻居。当消息被发送时,它会被节点传递,直到它再次到达发送者。这意味着每个节点基本上同时是客户端和服务器。

我还需要能够检测到死节点,以便我的环不会损坏。

我认为每个节点为每个邻居都有一个单独的连接可能是一个很好的解决方案 - successorpredecessor

class Node:
    ...
    def run():
        ...
        s = loop.create_connection(lambda: Client(...), addr1, port1)
        p = loop.create_server(lambda: Server(...), addr2, port2)
        successor = loop.run_until_complete(s)
        predecessor = loop.run_until_complete(p)
        loop.run_forever()
        ...
    ...
ServerClient 是实现 asyncio.Protocol 的类。

我想这样做的原因是,如果有消息通过圈子发送,它总是从 predecessor 发送到 successor 。在 connection_lostpredecessor 方法中,我可以检测到它已断开连接并向其 predecessor 发送消息(通过整个环)以连接到我。

我希望能够将从我的 predecessor 收到的消息进一步发送到我的 successor 。我还希望能够将带有我的地址的消息发送到我的 successor 以防我的 predecessor 死了(此消息将从 predecessorServer.connection_lost() 发送,并将一直传递到我死了的 predecessorpredecessor )。

我的问题是: 我可以将接收到的数据从 predecessor 传递给 successor 吗?如果不是,那么使用 AsyncIO 和环形拓扑的这个程序的更好实现是什么?

最佳答案

对于遇到相同问题的 AsyncIO 新手,我自己找到了解决方案。

首先,最好使用 AsyncIO 的高级方面—— streams 。调用 loop.create_connctionloop.create_server 被认为是低级的(一开始我理解错了)。
create_connection 的高级替代方案是 asyncio.open_connection ,它将为您提供一个由 asyncio.StreamReaderasyncio.StreamWriter 组成的元组,您可以使用它来读取和写入打开的连接。当从 StreamReader 读取的数据等于 b'' 或在尝试写入 ConnectionError 时捕获异常 ( StreamWriter ) 时,您还可以检测连接丢失。
create_server 的高级替代方案是 asyncio.start_server ,它需要提供一个回调函数,每次与服务器建立连接(打开连接,接收数据...)时都会调用该回调函数。回调有 StreamReaderStreamWriter 作为参数。也可以通过在写入 b'' 时接收 ConnectionErrorwriter 来检测连接丢失。

协程可以处理多个连接。服务器部分可以有一个协程(它接受来自环拓扑中一个邻居的连接)和一个客户端部分(打开到环中另一个邻居的连接)的协程。 Node 类可以是这样的:

import asyncio

class Node:
    ...
    async def run(self):
        ...
        self.next_reader, self.next_writer = await asyncio.open_connection(self.next_IP, self.next_port)
        server_coro = asyncio.create_task(self.server_init())
        client_coro = asyncio.create_task(self.client_method())
        await client_coro
        await server_coro
        ...

    async def server_init(self):
        server = await asyncio.start_server(self.server_callback, self.IP, self.port)
        async with server:
            await server.serve_forever()

    async def client_method(self):
        ...
        try:
            data = await self.next_reader.read()
        except ConnectionError:
            ...
    ...

请注意,我将 asyncio.create_task 用于协程和(不在代码列表中) asyncio.run(node.run()) ,它们被认为是 asyncio.ensure_future()loop.run_forever() 的高级替代品。这两个都是在 Python 3.7 中添加的,据说 asyncio.run() 是临时的,所以当你阅读本文时,它可能已经被其他东西取代了。

我不是 AsyncIO 专家,因此可能有更好、更简洁的方法来执行此操作(如果您知道,请分享)。

关于python-3.x - Python 3 - 多个 AsyncIO 连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53615584/

相关文章:

python - 根据文件名通过 FTP 上传多个文件

html - 聊天框工作?这是正确的方法吗?

javascript - 聊天应用程序如何为特定聊天生成 id?

email - 有没有办法在带有更新文本的电子邮件中嵌入 iframe?

python - 连接和断开连接时 OPC UA 客户端 python asyncua 错误

python - 如何阻止我的 websocket 连接关闭?

python - 如何在 Python 中找到行梯形矩阵形式(未缩减)?

python - 如何将列表拆分为元组

python - 如何处理错误引发 - Python3 中的 Asyncio

python - 为什么列表解包不能用于索引第二个列表?