python - 带有休克数据的 Multiprocessing.Queue 导致 _wait_for_tstate_lock

标签 python python-3.x queue python-multiprocessing python-multithreading

threading._wait_for_tstate_lock 中引发异常当我在 Process 之间传输休数据时和一个 Thread通过 multiprocessing.Queue .
我的最小工作示例首先看起来有点复杂 - 抱歉。我会解释。原始应用程序将大量(不那么重要)文件加载到 RAM 中。这是在单独的过程中完成以节省资源。主 gui 线程不应该卡住。

  • GUI 启动一个单独的 Thread防止 gui 事件循环卡住。
  • 这个单独的 Thread然后开始一个 Process这应该可以完成工作。

  • a) 这个 Thread实例化一个 multiprocess.Queue (请注意,这是 multiprocessing 而不是 threading!)
    b) 这是给 Process用于共享来自 Process 的数据返回 Thread .
  • Process做一些工作(3 个步骤)和 .put()结果进入multiprocessing.Queue .
  • Process结束 Thread再次接管并从 Queue 收集数据,将其存储到自己的属性 MyThread.result .
  • Thread告诉 GUI 主循环/线程在有时间时调用回调函数。
  • 回调函数( MyWindow::callback_thread_finished() )从 MyWindow.thread.result 获取结果.

  • 问题是如果数据放入 Queue是大事发生了我不明白 - MyThread永不结束。我必须通过 Strg+C 取消申请。
    我从文档中得到了一些提示。但我的问题是我没有完全理解文档。但我有一种感觉,我的问题的关键可以在那里找到。
    请参阅“Pipes and Queues”(Python 3.5 文档)中的两个红色 boxex。
    那是完整的输出
    MyWindow::do_start()
    Running MyThread...
    Running MyProcess...
    MyProcess stoppd.
    ^CProcess MyProcess-1:
    Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
    Traceback (most recent call last):
      File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
        t.join()
      File "/usr/lib/python3.5/threading.py", line 1054, in join
        self._wait_for_tstate_lock()
      File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
        elif lock.acquire(block, timeout):
    KeyboardInterrupt
    Traceback (most recent call last):
      File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
        util._exit_function()
      File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
        _run_finalizers()
      File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
        finalizer()
      File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
        res = self._callback(*self._args, **self._kwargs)
      File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
        thread.join()
      File "/usr/lib/python3.5/threading.py", line 1054, in join
        self._wait_for_tstate_lock()
      File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
        elif lock.acquire(block, timeout):
    KeyboardInterrupt
    
    这是最小的工作示例
    #!/usr/bin/env python3
    
    import multiprocessing
    import threading
    import time
    import gi
    gi.require_version('Gtk', '3.0')
    from gi.repository import Gtk
    from gi.repository import GLib
    
    
    class MyThread (threading.Thread):
        """This thread just starts the process."""
        def __init__(self, callback):
            threading.Thread.__init__(self)
            self._callback = callback
    
        def run(self):
            print('Running MyThread...')
            self.result = []
    
            queue = multiprocessing.Queue()
            process = MyProcess(queue)
            process.start()
            process.join()
    
            while not queue.empty():
                process_result = queue.get()
                self.result.append(process_result)
            print('MyThread stoppd.')
            GLib.idle_add(self._callback)
    
    
    class MyProcess (multiprocessing.Process):
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)
            self.queue = queue
    
        def run(self):
            print('Running MyProcess...')
            for i in range(3):
                self.queue.put((i, 'x'*102048))
            print('MyProcess stoppd.')
    
    class MyWindow (Gtk.Window):
        def __init__(self):
            Gtk.Window.__init__(self)
            self.connect('destroy', Gtk.main_quit)
            GLib.timeout_add(2000, self.do_start)
    
        def do_start(self):
            print('MyWindow::do_start()')
            # The process need to be started from a separate thread
            # to prevent the main thread (which is the gui main loop)
            # from freezing while waiting for the process result.
            self.thread = MyThread(self.callback_thread_finished)
            self.thread.start()
    
        def callback_thread_finished(self):
            result = self.thread.result
            for r in result:
                print('{} {}...'.format(r[0], r[1][:10]))
    
    if __name__ == '__main__':
        win = MyWindow()
        win.show_all()
        Gtk.main()
    
    可能重复但完全不同,IMO 没有对我的情况做出回答:Thread._wait_for_tstate_lock() never returns .
    解决方法
    通过将第 22 行修改为 queue = multiprocessing.Manager().Queue() 来使用管理器解决这个问题。但我不知道为什么。我问这个问题的目的是了解背后的东西,而不仅仅是让我的代码工作。甚至我真的不知道什么是 Manager()是,如果它有其他(引起问题的)影响。

    最佳答案

    根据您链接到的文档中的第二个警告框,在处理队列中的所有项目之前加入进程时,您可能会遇到死锁。所以开始这个过程并立即加入它然后处理队列中的项目是错误的步骤顺序。您必须启动该过程,然后接收项目,然后只有在接收到所有项目后才能调用 join 方法。定义一些哨兵值以表示进程已完成通过队列发送数据。 None例如,如果这不是您期望从流程中获得的常规值。

    class MyThread(threading.Thread):
        """This thread just starts the process."""
    
        def __init__(self, callback):
            threading.Thread.__init__(self)
            self._callback = callback
            self.result = []
    
        def run(self):
            print('Running MyThread...')
            queue = multiprocessing.Queue()
            process = MyProcess(queue)
            process.start()
            while True:
                process_result = queue.get()
                if process_result is None:
                    break
                self.result.append(process_result)
            process.join()
            print('MyThread stoppd.')
            GLib.idle_add(self._callback)
    
    
    class MyProcess(multiprocessing.Process):
    
        def __init__(self, queue):
            multiprocessing.Process.__init__(self)
            self.queue = queue
    
        def run(self):
            print('Running MyProcess...')
            for i in range(3):
                self.queue.put((i, 'x' * 102048))
            self.queue.put(None)
            print('MyProcess stoppd.')
    

    关于python - 带有休克数据的 Multiprocessing.Queue 导致 _wait_for_tstate_lock,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56321756/

    相关文章:

    python - 如何在python中不同的两个数组上应用不同的运算符

    python - 什么是 Python 3 中的 find()?

    python - 确定 ThreadPool 何时完成处理队列

    用于存储最新值的 C++ 固定大小容器

    python - django 有超过 1 个外键错误

    python - 有没有一种更干净、更有效的方法来用 Python 制作这些 BMI 和 Fat% 计算器?

    python - 分析多进程 Python 脚本时出现神秘的 pickle 错误

    c++ - 在数组中插入多个元素——计时函数

    python - 在Python中比较两个文本文件

    python - Python 中的 DICOM 切片厚度