linux - 如何使用 urwid 和 asyncio 使长任务非阻塞?

标签 linux python-3.5 nonblocking urwid

我正在编写一个 Python curses 应用程序,它通过进程的 stdinstdout 分别发送和接收字符串来控制外部(Linux,如果有帮助的话)进程.该接口(interface)使用 urwid。我已经为一些 urwid 组件编写了一个类来控制外部进程和其他几个类。

我还有一个按钮,用于向外部进程发送命令。然而,该进程不会立即响应,它的任务通常需要几秒钟,在此期间我希望界面不要卡住。

这是我运行子进程的方式:

def run(self, args):
    import io, fcntl, os
    from subprocess import Popen, PIPE

    # Run wpa_cli with arguments, use a thread to feed the process with an input queue
    self._pss = Popen(["wpa_cli"] + args, stdout=PIPE, stdin=PIPE)
    self.stdout = io.TextIOWrapper(self._pss.stdout, encoding="utf-8")
    self.stdin = io.TextIOWrapper(self._pss.stdin, encoding="utf-8", line_buffering=True)

    # Make the process' stdout a non-blocking file
    fd = self.stdout.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    ...

我不得不让进程的输出流成为非阻塞的,以便能够解析它的输出。我不知道这对我的问题是否重要。

以下是我用来控制子进程输入和输出流的方法:

def read(self, parser=None, transform=None, sentinel='>'):
    """ Reads from the controlled process' standard output until a sentinel
    is found. Optionally execute a callable object with every line. Parsed
    lines are placed in a list, which the function returns upon exiting. """

    if not transform:
        transform = lambda str: str

    def readline():
        return transform(self.stdout.readline().strip())

    # Keep a list of (non-empty) parsed lines
    items = []
    for line in iter(readline, sentinel):
        if callable(parser):
            item = parser(line)
            if item is not None:
                items.append(item)
    return items

def send(self, command, echo=True):
    """ Sends a command to the controlled process. Action commands are
    echoed to the standard output. Argument echo controls whether or not
    they're removed by the reader function before parsing. """

    print(command, file=self.stdin)

    # Should we remove the echoed command?
    if not echo:
        self.read(sentinel=command)

我谈到的按钮只是从主脚本入口函数设置了回调。该回调应该向子进程发送命令并循环遍历生成的输出行,直到找到给定的文本,在这种情况下回调函数退出。在此之前,该过程会输出一些我需要捕捉并显示在用户界面中的有趣信息。

例如:

def button_callback():
    # This is just an illustration

    filter = re.compile('(event1|event2|...)')

    def formatter(text):
        try:
            return re.search(filter, text).group(1)
        except AttributeError:
            return text

    def parser(text):
        if text == 'event1':
            # Update the info Text accordingly
        if text == 'event2':
            # Update the info Text accordingly

    controller.send('command')
    controller.read(sentinel='beacon', parser=parser, transform=formatter)

需要注意的是:

  • read() 函数自旋(我找不到其他方法),即使在从(可选的)已解析行中读取哨兵值之前,进程输出流是静默的,
  • 直到按钮回调函数退出,urwid 界面才会刷新,这会阻止 urwid 的主循环刷新屏幕。

我可以使用线程,但据我所知,urwid 支持 asyncio,这就是我想要实现的。你可以说我笨,因为即使浏览了 urwid asyncio 示例并阅读了 Python asyncio 文档,我也无法清楚地弄清楚是怎么回事。

鉴于有改变这些方法的空间,我仍然希望保留进程控制类——即包含 read()send() 的类— 尽可能通用。

到目前为止,我没有尝试在进程繁忙时更新界面。接收进程“通知”的组件是一个普通的 urwid.Text() 小部件。

最佳答案

首先要做两件事:

  • 您不一定需要 asyncio 来使用 urwid 执行异步操作,因为它已经有一个 simple event loop。这通常已经足够好了,它具有用于处理许多 IO 场景的原语。

  • 编写异步代码时,您需要注意编写同步代码,例如那些循环直到找到标记值的函数,因为它会阻止任何其他代码(包括事件循环本身)执行:这意味着UI 将卡住,直到该函数返回

对于您的情况,您可以使用默认的简单事件循环并使用 MainLoop.watch_pipe方法,它创建一个准备好在子进程中使用的管道(已经将其置于异步/非阻塞模式,顺便说一句:))并在有新数据写入管道时调用回调。

这是一个使用它的简单示例,显示了 shell 命令的输出,同时保持 UI 不阻塞(注意,因为懒惰而使用了一些全局变量):

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function, absolute_import, division
import subprocess
import urwid


def show_or_exit(key):
    if key in ('q', 'Q', 'esc'):
        raise urwid.ExitMainLoop()


def update_text(read_data):
    text.set_text(text.text + read_data)


def enter_idle():
    loop.remove_watch_file(pipe.stdout)


if __name__ == '__main__':
    widget = urwid.Pile([
        urwid.Button('Here is a button'),
        urwid.Button('And here another button'),
        urwid.Button('One more, just to be sure'),
        urwid.Button("Heck, let's add yet another one!"),
    ])
    text = urwid.Text('PROCESS OUTPUT:\n')
    widget = urwid.Columns([widget, text])
    widget = urwid.Filler(widget, 'top')

    loop = urwid.MainLoop(widget, unhandled_input=show_or_exit)
    stdout = loop.watch_pipe(update_text)
    stderr = loop.watch_pipe(update_text)
    pipe = subprocess.Popen('for i in $(seq 50); do echo -n "$i "; sleep 0.5; done',
                            shell=True, stdout=stdout, stderr=stderr)
    loop.run()

请注意回调 update_text 中的代码如何没有理由阻塞:它获取已读取的数据,更新组件,仅此而已。没有 while 循环等待其他事情发生。

在您的情况下,您可能需要调整解析 wpa_cli 输出的函数,以便它们也没有理由阻止。例如,他们可以设置一些变量或以其他方式发出信号,而不是等待循环直到找到值。

我希望这是有道理的,如果您需要澄清某些事情,请告诉我! :)

关于linux - 如何使用 urwid 和 asyncio 使长任务非阻塞?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48931134/

相关文章:

c - 此 C 示例代码中的 CPU 符号是如何解析的?

linux - 在 Linux 终端中更改 DIR 颜色

python - 生成连接范围的 1D​​ NumPy 数组

对 OpenSSL 非阻塞 I/O 感到困惑

c - 如何在C中将阻塞文件io转换为非阻塞文件

java - Spring webflux非阻塞响应

linux - 两个文件中的唯一值

r - ‘sf’(R 包)的命名空间加载失败,无法加载共享对象

python-asyncio - 在 asyncio 中混合异步上下文管理器和直接等待

python - 'await future' 和 'await asyncio.wait_for(future, None)' 之间有区别吗?