python - urwid - 无限循环的输出屏幕

标签 python multithreading urwid

我正在尝试使一个简单的 urwid 成为无限循环的输出屏幕。它需要输出来自另一个类的数据。

我现在找到的解决方案是:拥有一个带有 queue 属性的 Printer 类(实际输出类的测试替换器)。当它需要显示某些内容时,它会将其附加到队列。然后,有一个 Interface 类——实际的接口(interface)——有它自己的 Printer 实例。与 MainLoop 并行运行的线程检查队列是否有项目,如果有,则输出它们。由于 Printer 的 main 函数是一个无限循环,它也有自己的线程 - 在这个测试中,它只是每隔几秒输出一次“Hello”。

代码如下:

import urwid
import threading
import time

class Interface:
    palette = [
        ('body', 'white', 'black'),
        ('ext', 'white', 'dark blue'),
        ('ext_hi', 'light cyan', 'dark blue', 'bold'),
        ]

    header_text = [
        ('ext_hi', 'ESC'), ':quit        ',
        ('ext_hi', 'UP'), ',', ('ext_hi', 'DOWN'), ':scroll',
        ]

    def __init__(self):
        self.header = urwid.AttrWrap(urwid.Text(self.header_text), 'ext')
        self.flowWalker = urwid.SimpleListWalker([])
        self.body = urwid.ListBox(self.flowWalker)
        self.footer = urwid.AttrWrap(urwid.Edit("Edit:  "), 'ext')
        self.view = urwid.Frame(
            urwid.AttrWrap(self.body, 'body'),
            header = self.header,
            footer = self.footer)
        self.loop = urwid.MainLoop(self.view, self.palette, 
            unhandled_input = self.unhandled_input)
        self.printer = Printer()

    def start(self):
        t1 = threading.Thread(target = self.fill_screen)
        t1.daemon = True
        t2 = threading.Thread(target = self.printer.fill_queue)
        t2.daemon = True
        t1.start()
        t2.start()
        self.loop.run()

    def unhandled_input(self, k):
        if k == 'esc':
            raise urwid.ExitMainLoop()

    def fill_screen(self):  
        while True:
            if self.printer.queue:
                self.flowWalker.append(urwid.Text(('body', self.printer.queue.pop(0))))
                try:
                    self.loop.draw_screen()
                    self.body.set_focus(len(self.flowWalker)-1, 'above')
                except AssertionError: pass

    def to_screen(self, text):
        self.queue.append(text)


class Printer:
    def __init__(self):
        self.message = 'Hello'
        self.queue = []

    def fill_queue(self):
        while 1:
            self.queue.append(self.message)
            time.sleep(2)


if __name__ == '__main__':
    i = Interface()
    i.start()

它有效,但对我来说太乱了,我担心它最终会成为某种编码恐怖。有没有更简单的方法来完成任务?

最佳答案

如果您需要 外部线程,请考虑以下代码。它建立一个队列,启动一个“发送当前时间到队列”线程,然后运行主界面。接口(interface)不时检查共享队列,并根据需要更新自身。

当界面退出时,外层代码示意线程礼貌退出。

来源

import logging, Queue, sys, threading, time

import urwid

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)-4s %(threadName)s %(message)s", 
    datefmt="%H:%M:%S",
    filename='trace.log',
)

class Interface:
    palette = [
        ('body', 'white', 'black'),
        ('ext', 'white', 'dark blue'),
        ('ext_hi', 'light cyan', 'dark blue', 'bold'),
        ]

    header_text = [
        ('ext_hi', 'ESC'), ':quit        ',
        ('ext_hi', 'UP'), ',', ('ext_hi', 'DOWN'), ':scroll',
        ]

    def __init__(self, msg_queue):
        self.header = urwid.AttrWrap(urwid.Text(self.header_text), 'ext')
        self.flowWalker = urwid.SimpleListWalker([])
        self.body = urwid.ListBox(self.flowWalker)
        self.footer = urwid.AttrWrap(urwid.Edit("Edit:  "), 'ext')
        self.view = urwid.Frame(
            urwid.AttrWrap(self.body, 'body'),
            header = self.header,
            footer = self.footer)
        self.loop = urwid.MainLoop(self.view, self.palette, 
            unhandled_input = self.unhandled_input)
        self.msg_queue = msg_queue
        self.check_messages(self.loop, None)

    def unhandled_input(self, k):
        if k == 'esc':
            raise urwid.ExitMainLoop()

    def check_messages(self, loop, *_args):
        """add message to bottom of screen"""
        loop.set_alarm_in(
            sec=0.5,
            callback=self.check_messages,
            )
        try:
            msg = self.msg_queue.get_nowait()
        except Queue.Empty:
            return
        self.flowWalker.append(
            urwid.Text(('body', msg))
            )
        self.body.set_focus(
            len(self.flowWalker)-1, 'above'
            )

def update_time(stop_event, msg_queue):
    """send timestamp to queue every second"""
    logging.info('start')
    while not stop_event.wait(timeout=1.0):
        msg_queue.put( time.strftime('time %X') )
    logging.info('stop')

if __name__ == '__main__':

    stop_ev = threading.Event()
    message_q = Queue.Queue()

    threading.Thread(
        target=update_time, args=[stop_ev, message_q],
        name='update_time',
    ).start()

    logging.info('start')
    Interface(message_q).loop.run()
    logging.info('stop')

    # after interface exits, signal threads to exit, wait for them
    logging.info('stopping threads')

    stop_ev.set()
    for th in threading.enumerate():
        if th != threading.current_thread():
            th.join()

关于python - urwid - 无限循环的输出屏幕,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25404656/

相关文章:

python - pandas.DataFrame.replace() 得到 "OverflowError: Python int too large to convert to C long"

python - Python 3 中变量的简单表达式

java - 你能在不同的线程上调用相同的方法吗?

multithreading - 为每个线程分配一个面板 - Delphi

python - urwid 中是否有焦点改变事件?

python - urwid 列表框不会滚动

Python - 具有动态表名的 MySQL SELECT 查询

python - 在python中,我想根据字典值的字典对字典进行排序

java - 在线程更新时读取 boolean 值数组

Python 控制台 UI 建议