python - 使用 asyncio 的简单 Python TCP fork 服务器

标签 python python-3.x python-asyncio

我想做什么

我正在尝试模拟以下简单 socat(1) 的行为命令:

socat tcp-listen:SOME_PORT,fork,reuseaddr exec:'SOME_PROGRAM'

上述命令创建了一个 fork TCP 服务器,该服务器 fork 并执行 SOME_PROGRAM对于每个连接,重定向 stdinstdout将所述命令发送到 TCP 套接字。

这是我想要实现的 :
  • 使用 asyncio 创建一个简单的 TCP 服务器处理多个并发连接。
  • 每当接收到连接时,启动 SOME_PROGRAM作为子进程。
  • 将从套接字接收到的任何数据传递给 SOME_PROGRAM的标准输入。
  • 传递从 SOME_PROGRAM 收到的任何数据的标准输出到套接字。
  • SOME_PROGRAM退出,将告别消息和退出代码一起写入套接字并关闭连接。

  • 我想在纯 Python 中执行此操作,而不使用使用 asyncio 的外部库模块。

    到目前为止我所拥有的

    这是我到目前为止编写的代码(如果它很长,请不要害怕,只是大量注释和间隔):
    import asyncio
    
    class ServerProtocol(asyncio.Protocol):
        def connection_made(self, transport):
            self.client_addr   = transport.get_extra_info('peername')
            self.transport     = transport
            self.child_process = None
    
            print('Connection with {} enstablished'.format(self.client_addr))
    
            asyncio.ensure_future(self._create_subprocess())
    
        def connection_lost(self, exception):
            print('Connection with {} closed.'.format(self.client_addr))
    
            if self.child_process.returncode is not None:
                self.child_process.terminate()
    
        def data_received(self, data):
            print('Data received: {!r}'.format(data))
    
            # Make sure the process has been spawned
            # Does this even make sense? Looks so awkward to me...
            while self.child_process is None:
                continue
    
            # Write any received data to child_process' stdin
            self.child_process.stdin.write(data)
    
        async def _create_subprocess(self):
            self.child_process = await asyncio.create_subprocess_exec(
                *TARGET_PROGRAM,
                stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE
            )
    
            # Start reading child stdout
            asyncio.ensure_future(self._pipe_child_stdout())
    
            # Ideally I would register some callback here so that when
            # child_process exits I can write to the socket a goodbye
            # message and close the connection, but I don't know how
            # I could do that...
    
        async def _pipe_child_stdout(self):
            # This does not seem to work, this function returns b'', that is an
            # empty buffer, AFTER the process exits...
            data = await self.child_process.stdout.read(100) # Arbitrary buffer size
    
            print('Child process data: {!r}'.format(data))
    
            if data:
                # Send to socket
                self.transport.write(data)
                # Reschedule to read more data
                asyncio.ensure_future(self._pipe_child_stdout())
    
    
    SERVER_PORT    = 6666
    TARGET_PROGRAM = ['./test']
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        coro = loop.create_server(ServerProtocol, '0.0.0.0', SERVER_PORT)
        server = loop.run_until_complete(coro)
    
        print('Serving on {}'.format(server.sockets[0].getsockname()))
    
        try:
            loop.run_forever()
        except KeyboardInterrupt:
            pass
    
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()
    

    还有 ./test我试图作为子进程运行的程序:
    #!/usr/bin/env python3
    
    import sys
    
    if sys.stdin.read(2) == 'a\n':
        sys.stdout.write('Good!\n')
    else:
        sys.exit(1)
    
    if sys.stdin.read(2) == 'b\n':
        sys.stdout.write('Wonderful!\n')
    else:
        sys.exit(1)
    
    sys.exit(0)
    

    不幸的是,上面的代码并没有真正起作用,我对接下来要尝试的东西有点迷茫。

    什么按预期工作 :
  • 子进程正确生成,并且似乎也正确接收来自套接字的输入,因为我可以从 htop 看到它我也可以看到,只要我发送 b\n它终止。

  • 什么不按预期工作 :

    其他的基本上都...
  • 子进程的输出永远不会发送到套接字,实际上根本不会读取。来电 await self.child_process.stdout.read(100) 似乎永远不会终止:相反,它只会在子进程死亡后终止,结果只是 b'' (一个空的 bytes 对象)。
  • 我无法理解子进程何时终止:正如我上面提到的,我想向套接字发送一条“再见”消息以及 self.child_process.returncode 当这种情况发生时,但我不知道如何以一种有意义的方式做到这一点。

  • 我试过的 :
  • 我尝试使用 asyncio.loop.subprocess_exec() 创建子进程而不是 asyncio.create_subprocess_exec() .这解决了知道进程何时终止的问题,因为我可以实例化 asyncio.SubprocessProtocol 的子类并使用其 process_exited() 方法,但是 根本没有真正帮助我,因为如果我这样做,我就没有办法与进程交谈'stdinstdout没有了!也就是说,我没有 Process要与之交互的对象...
  • 我试着玩弄 asyncio.loop.connect_write_pipe() loop.connect_read_pipe() 没有运气。

  • 问题

    那么,有人可以帮我弄清楚我做错了什么吗?必须有办法使这项工作顺利进行。当我第一次开始时,我正在寻找一种方法来轻松使用一些管道重定向,但我不知道此时是否可能。是吗?看起来应该是这样。

    我可以使用 fork() 在 15 分钟内用 C 语言编写这个程序, exec()dup2() ,所以我必须缺少一些东西!任何帮助表示赞赏。

    最佳答案

    您的代码有两个直接的实现问题:

  • 服务器在将接收到的数据传输到子进程之前去除空白。这将删除尾部的换行符,因此如果 TCP 客户端发送 "a\n" ,子进程将只收到 "a" .这样子进程永远不会遇到预期的 "a\n"字符串,它总是在读取两个字节后终止。这解释了来自子流程的空字符串 (EOF)。 (剥离已在对该问题的后续编辑中删除。)
  • 子进程不会刷新其输出,因此服务器不会收到任何写入。只有在子进程退出或填满其输出缓冲区时才会看到写入,该缓冲区以千字节为单位,在显示简短的调试消息时需要一段时间来填充。

  • 另一个问题是在设计层面。正如评论中提到的,除非你明确打算实现一个新的 asyncio 协议(protocol),否则它是 recommended坚持上级stream-based API ,在本例中为 start_server 功能。甚至更低级别的功能,如 SubprocessProtocol , connect_write_pipe , 和 connect_read_pipe也不是您想在应用程序代码中使用的东西。这个答案的其余部分假设一个基于流的实现。
    start_server接受一个协程,该协程将在客户端连接时作为新任务产生。它使用两个异步流参数调用,一个用于读取,一个用于写入。协程包含与客户端通信的逻辑;在你的情况下,它会产生子进程并在它和客户端之间传输数据。

    请注意,套接字和子进程之间的双向数据传输无法通过读取后写入的简单循环来实现。例如,考虑这个循环:
    # INCORRECT: can deadlock (and also doesn't detect EOF)
    child = await asyncio.create_subprocess_exec(...)
    while True:
        proc_data = await child.stdout.read(1024)  # (1)
        sock_writer.write(proc_data)
        sock_data = await sock_reader.read(1024)
        child.stdin.write(sock_data)               # (2)
    

    这种循环容易出现死锁。如果子进程正在响应它从 TCP 客户端接收的数据,它有时只会在接收到一些输入后才提供输出。这将无限期地阻塞 (1) 处的循环,因为它可以从 child 的 stdout 中获取数据。只有在送 child 之后才sock_data ,稍后发生在(2)处。实际上,(1)等待(2),反之亦然,构成一个死锁。请注意,颠倒传输顺序无济于事,因为如果 TCP 客户端正在处理服务器子进程的输出,则循环将死锁。

    使用 asyncio 可以轻松避免这种死锁:只需并行生成两个协程,一个将数据从套接字传输到子进程的标准输入,另一个将数据从子进程的标准输出传输到套接字。例如:
    # correct: deadlock-free (and detects EOF)
    async def _transfer(src, dest):
        while True:
            data = await src.read(1024)
            if data == b'':
                break
            dest.write(data)
    
    child = await asyncio.create_subprocess_exec(...)
    loop.create_task(_transfer(child.stdout, sock_writer))
    loop.create_task(_transfer(sock_reader, child.stdin))
    await child.wait()
    

    此设置与第一个 while 的区别循环是两个相互独立的传输。死锁不会发生,因为从套接字读取从不等待从子进程读取,反之亦然。

    应用于这个问题,整个服务器看起来像这样:

    import asyncio
    
    class ProcServer:
        async def _transfer(self, src, dest):
            while True:
                data = await src.read(1024)
                if data == b'':
                    break
                dest.write(data)
    
        async def _handle_client(self, r, w):
            loop = asyncio.get_event_loop()
            print(f'Connection from {w.get_extra_info("peername")}')
            child = await asyncio.create_subprocess_exec(
                *TARGET_PROGRAM, stdin=asyncio.subprocess.PIPE,
                stdout=asyncio.subprocess.PIPE)
            sock_to_child = loop.create_task(self._transfer(r, child.stdin))
            child_to_sock = loop.create_task(self._transfer(child.stdout, w))
            await child.wait()
            sock_to_child.cancel()
            child_to_sock.cancel()
            w.write(b'Process exited with status %d\n' % child.returncode)
            w.close()
    
        async def start_serving(self):
            await asyncio.start_server(self._handle_client,
                                       '0.0.0.0', SERVER_PORT)
    
    SERVER_PORT    = 6666
    TARGET_PROGRAM = ['./test']
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        server = ProcServer()
        loop.run_until_complete(server.start_serving())
        loop.run_forever()
    

    随行test程序也必须修改为调用 sys.stdout.flush() 每一次之后 sys.stdout.write() ,否则消息会留在其 stdio 缓冲区中,而不是发送到父级。

    When I first started I was looking for a way to just effortlessly use some pipe redirection, but I don't know if that's even possible at this point. Is it? It looks like it should be.



    在类 Unix 系统上,当然可以将套接字重定向到生成的子进程,以便子进程直接与客户端对话。 (旧的 inetd Unix 服务器就是这样工作的。)但是 asyncio 不支持这种操作模式,原因有两个:
  • 它不适用于 Python 和 asyncio 支持的所有系统,尤其是在 Windows 上。
  • 它与核心 asyncio 功能不兼容,例如传输/协议(protocol)和流,它们承担对底层套接字的所有权和独占访问。

  • 即使你不关心可移植性,请考虑第二点:你可能需要处理或记录 TCP 客户端和子进程之间交换的数据,如果它们在内核级别焊接在一起,你就不能这样做.此外,与仅处理不透明子进程相比,在 asyncio 协程中实现超时和取消要容易得多。

    如果不可移植性和无法控制通信适合您的用例,那么您可能一开始就不需要 asyncio - 没有什么可以阻止您生成一个运行经典阻塞服务器的线程,该服务器使用相同的方式处理每个客户端 os.fork 的序列, os.dup2 , 和 os.execlp 你会用 C 写的。

    编辑

    正如 OP 在评论中指出的那样,原始代码通过杀死子进程来处理 TCP 客户端断开连接。在流层,连接丢失由流反射(reflect),要么发出文件结束信号,要么引发异常。在上面的代码中,可以通过替换通用 self._transfer() 来轻松应对连接丢失。使用更具体的协程来处理这种情况。例如,而不是:
    sock_to_child = loop.create_task(self._transfer(r, child.stdin))
    

    ...可以写:
    sock_to_child = loop.create_task(self._sock_to_child(r, child))
    

    并定义 _sock_to_child像这样(未经测试):
    async def _sock_to_child(self, reader, child):
        try:
            await self._transfer(reader, child.stdin)
        except IOError as e:
            # IO errors are an expected part of the workflow,
            # we don't want to propagate them
            print('exception:', e)
        child.kill()
    

    如果子进程比 TCP 客户端生命周期长,则 child.kill()行可能永远不会执行,因为协程将被 _handle_client 取消。虽然暂停在 src.read()_transfer() .

    关于python - 使用 asyncio 的简单 Python TCP fork 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55978030/

    相关文章:

    python - 如何在 GAE 的搜索 API 中实现自定义拼写检查

    python - 将 .jpg 转换为 .txt

    python - 为什么在 python 3.5 中使用 asyncio 时出现被忽略的异常

    loop.create_task 和 asyncio.run_coroutine_threadsafe 之间的 Python asyncio 区别

    python - 如何将当前日期转换为纪元时间戳?

    python - 使用 value_count(bins=x) 访问 bin 间隔

    python - 使用 Python 3 打印 HTML 数据

    python - 如何从文件加载多个正则表达式模式并匹配给定的字符串?

    Python 异步流 API

    python - 使用列表中项目的索引来比较它们在列表中出现的顺序