python - 在 asyncio 中的 KeyboardInterrupt 之后等待任务完成

标签 python python-3.4 python-asyncio

我试图了解asyncio 是如何工作的。在我的场景中,客户端与服务器建立 tcp 连接,发送登录字符串(如果经过身份验证) - 接收字符流。最后在 KeyboardInterrupt 上将 logoff 字符串发送到服务器并愉快地断开连接。

目前,我陷入了最后一部分,因为我的注销方法/任务在有机会完成之前就被销毁了。

^CTask was destroyed but it is pending!
source_traceback: Object created at (most recent call last):
  File "tst.py", line 101, in <module>
    client.login()
  File "tst.py", line 29, in login
    logoff_tsk = self.loop.create_task(self.logoff())
task: <Task pending coro=<logoff() running at tst.py:49> cb=[myClass._shutdown()] created at tst.py:29>

下面是产生此错误的代码:

from functools import partial
import asyncio

class myClass:
    def __init__(self, *a, **kw):
        self.transport = None
        self.protocol = None
        self.usr = str(kw.get("usr", ""))
        self.pwd = str(kw.get("pwd", ""))
        self.host = str(kw.get("host", "")) or "127.0.0.2"
        self.port = int(kw.get("port", 0)) or 5038
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(enabled=True)

    def reactor(self, recv):
        print("## ~/~ From reactor: {!r}".format(recv.decode()))

    def login(self):
        connection_tsk = self.loop.create_task(self.loop.create_connection(
            partial(
                myProtocol,
                reactor_func=self.reactor),
            host=self.host,
            port=self.port))
        connection_tsk.add_done_callback(self.set_transport_protocol)
        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            logoff_tsk = self.loop.create_task(self.logoff())
            logoff_tsk.add_done_callback(self._shutdown)

    def set_transport_protocol(self, fut):
        print("AmiCtl.set_transport_protocol")
        transport, protocol = fut.result()
        self.transport = transport
        self.protocol = protocol
        self.loop.call_soon(self._login)

    def _login(self):
        login_string = self.cmd("Login")
        self.loop.create_task(self.transmit(login_string))

    @asyncio.coroutine
    def transmit(self, cmd):
        if self.transport:
            print("## ~/~ Sending data: {!r}".format(cmd))
            self.transport.write(cmd)

    @asyncio.coroutine
    def logoff(self):
        command = self.cmd("Logoff")
        yield from asyncio.shield(self.transmit(command))

    def _shutdown(self):
        if self.transport:
            self.transport.close()
        self.loop.stop()
        self.loop.close()
        print("\n{!r}".format(self.loop))

    def cmd(self, action):
        """
        Produce login/logoff string.
        """

class myProtocol(asyncio.Protocol):
    def __init__(self, reactor_func=None):
        self.reactor_func = reactor_func
        self.loop = asyncio.get_event_loop()

    def connection_made(self, transport):
        self.transport = transport
        peername = transport.get_extra_info("peername")
        print("## ~/~ Connection made: {peer}".format(peer=peername))

    def data_received(self, data):
        if callable(self.reactor_func):
            self.reactor_func(data)

    def connection_lost(self, exc):
        print("## ~/~ Lost connection to the server!!")
        self.loop.stop()


client = myClass(usr="my_usr", pwd="my_pwd")
client.login()

如何改进/修复我的代码?

最佳答案

我不会引用你的代码,因为我认为你的问题是一个 xy 问题。 ( http://xyproblem.info ),但我会尝试以更通用的方式回答。我读到的是,您询问如何正确关闭异步应用程序。

我设置了一个小示例,我认为通过一些解释将使您走上正确的道路。阅读下面的代码,我希望它能够与您的用例相关,我将在下面进行解释。

import asyncio
import signal

loop = asyncio.get_event_loop()


class Connector:

    def __init__(self):
        self.closing = False
        self.closed = asyncio.Future()
        task = loop.create_task(self.connection_with_client())
        task.add_done_callback(self.closed.set_result)


    async def connection_with_client(self):
        while not self.closing:
            print('Read/write to open connection')
            await asyncio.sleep(1)

        print('I will now close connection')
        await asyncio.sleep(1)


conn = Connector()


def stop(loop):
    conn.closing = True
    print("from here I will wait until connection_with_client has finished")
    conn.closed.add_done_callback(lambda _: loop.stop())

loop.add_signal_handler(signal.SIGINT, stop, loop)
loop.run_forever()
loop.close()

您所要求的实际上并不是微不足道的,执行 asyncio 时最难管理的事情之一就是正确关闭 asyncio 应用程序。

要进行托管关闭,您必须有一个协程来处理关闭并在确保所有内容都关闭时将 future 设置为完成。在我的示例中,它是 task.add_done_callback(self.lined.set_result) 但这个 future 可以通过其他方式设置。

其次,您必须添加一个信号处理程序,该处理程序将运行非异步(正常)函数并安排回调,以便在“关闭 future ”完成时触发循环关闭/停止。

看看我的代码并尝试一下,直到您理解流程为止。我不怪你问,在我看来,做 asyncio 最困难的事情之一就是跟踪“松散”的协程,这将导致不干净的关闭。

当我第一次做这个“练习”并需要理解原理时,我浏览了 https://github.com/aio-libs/aioredis/blob/master/aioredis/connection.py#L93-L95 的代码 如果您花时间阅读代码,这些人会以非常漂亮的方式处理您的确切问题,我知道代码很复杂,但是喝杯咖啡并遵循方法调用,直到它有意义为止。

我希望这对您有所帮助,如果我不清楚,请发布更多详细信息或评论。并花时间了解这个主题(asycnio shutdown 的事情)。当您掌握了 asyncio 的关闭管理时,您就不再是 asyncio-padawan 了;)

您可能会认为,需要干净关闭大量代码,但据我了解,这就是实现此目的的方法(或多或少是更大、更复杂的应用程序的唯一方法)。

祝你好运。

关于python - 在 asyncio 中的 KeyboardInterrupt 之后等待任务完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35604033/

相关文章:

python - 将 JSON 时间戳字符串转换为 pandas dataframe 中的 python 日期

python - 管理搜索时出现 Django 错误 : Cannot resolve keyword 'username' into field

python - 更新 Tensorboard,同时使用 conda 保持 Tensorflow 旧版

Python3 : ImportError: No module named '_ctypes' when using Value from module multiprocessing

python - 测试 : testing a websocket connection

python - 如何在 __init__ 中使用 await 设置类属性

python - 使用多个工作表创建多个 Excel 文件

python - BeautifulSoup已安装,但miniconda仍然抛出属性错误

python - 收集python模块中的一些特定类

Python asyncio 对await 和task 感到困惑