在 threading._wait_for_tstate_lock
中引发异常当我在 Process
之间传输休数据时和一个 Thread
通过 multiprocessing.Queue
.
我的最小工作示例首先看起来有点复杂 - 抱歉。我会解释。原始应用程序将大量(不那么重要)文件加载到 RAM 中。这是在单独的过程中完成以节省资源。主 gui 线程不应该卡住。
Thread
防止 gui 事件循环卡住。Thread
然后开始一个 Process
这应该可以完成工作。a) 这个
Thread
实例化一个 multiprocess.Queue
(请注意,这是 multiprocessing
而不是 threading
!)b) 这是给
Process
用于共享来自 Process
的数据返回 Thread
.Process
做一些工作(3 个步骤)和 .put()
结果进入multiprocessing.Queue
.Process
结束 Thread
再次接管并从 Queue
收集数据,将其存储到自己的属性 MyThread.result
.Thread
告诉 GUI 主循环/线程在有时间时调用回调函数。MyWindow::callback_thread_finished()
)从 MyWindow.thread.result
获取结果.问题是如果数据放入
Queue
是大事发生了我不明白 - MyThread
永不结束。我必须通过 Strg+C 取消申请。我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我有一种感觉,我的问题的关键可以在那里找到。
请参阅“Pipes and Queues”(Python 3.5 文档)中的两个红色 boxex。
那是完整的输出
MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
t.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
util._exit_function()
File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
_run_finalizers()
File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
finalizer()
File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
thread.join()
File "/usr/lib/python3.5/threading.py", line 1054, in join
self._wait_for_tstate_lock()
File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
这是最小的工作示例#!/usr/bin/env python3
import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib
class MyThread (threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
def run(self):
print('Running MyThread...')
self.result = []
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
process.join()
while not queue.empty():
process_result = queue.get()
self.result.append(process_result)
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess (multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x'*102048))
print('MyProcess stoppd.')
class MyWindow (Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self)
self.connect('destroy', Gtk.main_quit)
GLib.timeout_add(2000, self.do_start)
def do_start(self):
print('MyWindow::do_start()')
# The process need to be started from a separate thread
# to prevent the main thread (which is the gui main loop)
# from freezing while waiting for the process result.
self.thread = MyThread(self.callback_thread_finished)
self.thread.start()
def callback_thread_finished(self):
result = self.thread.result
for r in result:
print('{} {}...'.format(r[0], r[1][:10]))
if __name__ == '__main__':
win = MyWindow()
win.show_all()
Gtk.main()
可能重复但完全不同,IMO 没有对我的情况做出回答:Thread._wait_for_tstate_lock() never returns .解决方法
通过将第 22 行修改为
queue = multiprocessing.Manager().Queue()
来使用管理器解决这个问题。但我不知道为什么。我问这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。甚至我真的不知道什么是 Manager()
是,如果它有其他(引起问题的)影响。
最佳答案
根据您链接到的文档中的第二个警告框,在处理队列中的所有项目之前加入进程时,您可能会遇到死锁。所以开始这个过程并立即加入它然后处理队列中的项目是错误的步骤顺序。您必须启动该过程,然后接收项目,然后只有在接收到所有项目后才能调用 join 方法。定义一些哨兵值以表示进程已完成通过队列发送数据。 None
例如,如果这不是您期望从流程中获得的常规值。
class MyThread(threading.Thread):
"""This thread just starts the process."""
def __init__(self, callback):
threading.Thread.__init__(self)
self._callback = callback
self.result = []
def run(self):
print('Running MyThread...')
queue = multiprocessing.Queue()
process = MyProcess(queue)
process.start()
while True:
process_result = queue.get()
if process_result is None:
break
self.result.append(process_result)
process.join()
print('MyThread stoppd.')
GLib.idle_add(self._callback)
class MyProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
print('Running MyProcess...')
for i in range(3):
self.queue.put((i, 'x' * 102048))
self.queue.put(None)
print('MyProcess stoppd.')
关于python - 带有休克数据的 Multiprocessing.Queue 导致 _wait_for_tstate_lock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56321756/