我目前正在尝试编写一个由 Raspberry Pi 控制的机器人 ARM 。
到目前为止,一切工作正常,除了一件事,我已经在谷歌上搜索并尝试了很多小时,但找不到有效的解决方案。
为了移动机器人 ARM ,需要使用线程“同时”运行所有电机(工作正常)。
我遇到的问题是,我需要更新一个标签,该标签在轴(电机)完成运动后立即显示其当前角度,但其他电机仍在运行(线程)。
经过大量研究,我认为我通过使用队列和 Tkinters 后方法找到了解决方案。但它仍然不起作用,因为标签文本仅在所有线程终止后才会更新。
我编写了一个示例代码,我希望获得电机“一”的标签更新,该更新将在电机“二”(500 次迭代)之前完成其 for 循环(100 次迭代)。我预计当一号电机达到目标而二号电机仍在运行时,标签就会立即更新。
但是虽然我使用了后置方法,但它仍然要等到电机二完成后才更新标签。
希望你能帮助我!
from tkinter import *
import threading
import time
from queue import *
class StepperMotors:
def __init__(self, root):
self.root = root
self.start_btn = Button(root, text="Start", command=lambda:self.start_movement())
self.start_btn.config(width = 10)
self.start_btn.grid(row=1,column=1)
self.label_one = Label(root, text='')
self.label_one.config(width = 10)
self.label_one.grid(row=2, column=1)
self.label_two = Label(root, text='')
self.label_two.config(width = 10)
self.label_two.grid(row=3, column=1)
def start_movement(self):
self.thread_queue = Queue()
self.root.after(100, self.wait_for_finish)
thread_one = threading.Thread(target=self.motor_actuation, args=(1,100))
thread_two = threading.Thread(target=self.motor_actuation, args=(2,500))
thread_one.start()
thread_two.start()
thread_one.join()
thread_two.join()
def motor_actuation(self, motor, iterations):
for i in range(iterations):
i = i+1
update_text = str(motor) + " " + str(i) + "\n"
print(update_text)
time.sleep(0.01)
self.thread_queue.put(update_text)
def wait_for_finish(self):
try:
self.text = self.thread_queue.get()
self.label_one.config(text=self.text)
except self.thread_queue.empty():
self.root.after(100, self.wait_for_finish)
if __name__ == "__main__":
root = Tk()
root.title("test")
stepper = StepperMotors(root)
root.mainloop()
最佳答案
最好使用非阻塞的守护线程。
此外,最好有一个 separation of concerns :机器人(或机器人 ARM )可以是一个有自己生命周期的对象:守护线程。同上,您可以定义一个“LabelUpdater”来读取机器人的状态并更新标签。
让我们定义一个机器人:
- 它是在应用程序初始化时创建的,并在用户单击“开始”按钮时运行,
- 机器人在应用级多线程队列中移动并报告其角度,
class Robot(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, end_pos: int):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.end_pos = end_pos
def run(self) -> None:
for angle in range(self.end_pos):
self.label_queue.put(angle)
time.sleep(0.01)
让我们定义一个LabelUpdater:
- 它是在应用程序初始化时创建的,并永远运行(即使机器人没有运行,它也可以观察它)。
- 它读取机器人队列(例如每秒一次以避免闪烁)并更新标签
class LabelUpdater(threading.Thread):
def __init__(self, name: str, label_queue: queue.Queue, root_app: tkinter.Tk, variable: tkinter.Variable):
super().__init__(name=name)
self.daemon = True
self.label_queue = label_queue
self.root_app = root_app
self.variable = variable
def run(self) -> None:
# run forever
while True:
# wait a second please
time.sleep(1)
# consume all the queue and keep only the last message
last_msg = None
while True:
try:
msg = self.label_queue.get(block=False)
except queue.Empty:
break
last_msg = msg
self.label_queue.task_done()
if last_msg:
self.variable.set(last_msg)
然后,主应用程序应该定义:
- 2 个多线程队列:每个标签一个,
- 2 将更新的
tkinter.StringVar
变量, - 机器人和标签更新器,
- 这两个更新程序已启动,并将永远运行。
class StepperMotors:
def __init__(self, root):
self.root = root
self.label_one_queue = queue.Queue()
self.label_two_queue = queue.Queue()
self.start_btn = tkinter.Button(root, text="Start", command=lambda: self.start_movement())
self.start_btn.config(width=10)
self.start_btn.grid(row=1, column=1)
self.text_one = tkinter.StringVar()
self.text_one.set("one")
self.label_one = tkinter.Label(root, textvariable=self.text_one)
self.label_one.config(width=10)
self.label_one.grid(row=2, column=1)
self.text_two = tkinter.StringVar()
self.text_two.set("two")
self.label_two = tkinter.Label(root, textvariable=self.text_two)
self.label_two.config(width=10)
self.label_two.grid(row=3, column=1)
self.robot_one = Robot("robot_one", self.label_one_queue, 100)
self.robot_two = Robot("robot_two", self.label_two_queue, 500)
self.updater_one = LabelUpdater("updater_one", self.label_one_queue, self.root, self.text_one)
self.updater_two = LabelUpdater("updater_two", self.label_two_queue, self.root, self.text_two)
self.updater_one.start()
self.updater_two.start()
def start_movement(self):
self.robot_one.start()
self.robot_two.start()
当然,您需要一个标志或其他东西来检查每个机器人是否尚未运行。
关于python - 在 Python 线程化过程中更新 Tkinter 标签,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60126741/