python - QThreadPool - 如何中断/如何明智地使用 waitForDone 方法

标签 python python-3.x pyqt pyqt4 qthread

背景:

我有一个脚本,允许我通过来自私有(private)编辑器的 API 对 PostgreSQL 数据库进行空间查询(我无法直接查询数据库)。该 API 适用于 python 3.2。简而言之,该脚本用于在所需的地理范围内下载该数据库的元素。根据区域的不同,您可以获得 1 到 100 多个元素,每个元素的大小都非常不同(从 Ko 到 Go)。

主窗口允许您设置所有选项,然后启动全局进程。启动后,会出现一个控制台窗口,让您查看发生了什么。下载项目后,控制台上会显示一个简短的“报告”。目前,一切都是一次一个元素按顺序完成的。正如您可以想象的,如果这个元素相当大,控制台会在等待下载过程结束时卡住。

代码:

我不会在这里发布完整的脚本,但通过一个非常简单的脚本,我将尝试展示我试图解决的主要问题(即避免锁定用户界面/拥有某种实时输出正在发生的事情)。

因此,为了避免这些卡住问题,在我看来,使用线程是最好的解决方案。为了模拟下载过程(请参阅上一章),我使用了具有多个 url(指向不同大小的文件)的 url.request urlretrieve 方法。

import os
import sys
import time
import urllib.request
from PyQt4 import QtCore, QtGui

url_1m = 'http://ipv4.sbg.proof.ovh.net/files/1Mio.dat'
url_10m = 'http://ipv4.sbg.proof.ovh.net/files/10Mio.dat'
url_100m = 'http://ipv4.sbg.proof.ovh.net/files/100Mio.dat'
url_1g = 'http://ipv4.sbg.proof.ovh.net/files/1Gio.dat'
url_10g = 'http://ipv4.sbg.proof.ovh.net/files/10Gio.dat'

urls = (url_1m, url_10m, url_100m, url_1g, url_10g)


# ---------------------------------------------------------------------------------
class DownloadWorkerSignals(QtCore.QObject):
    """
    Defines the signals available from a running download worker thread.
    """
    finished = QtCore.pyqtSignal(str)


# ---------------------------------------------------------------------------------
class DownloadWorker(QtCore.QRunnable):
    """
    Worker thread
    """

    def __init__(self, url, filepath, filename, index):
        super(DownloadWorker, self).__init__()

        self.url = url
        self.file_path = filepath
        self.filename = filename
        self.index = index

        self.signals = DownloadWorkerSignals()

    @QtCore.pyqtSlot(str)
    def run(self):
        t = time.time()
        message = 'Thread %d started\n' % self.index
        try:
            # The urlretrieve method will copy a network object to a local file
            urllib.request.urlretrieve(url=self.url,
                                       filename=os.path.join(self.file_path,
                                                             self.filename))
        except IOError as error:
            message += str(error) + '\n'
        finally:
            message += 'Thread %d ended %.2f s\n' % (self.index, time.time() - t)
            self.signals.finished.emit(message)  # Done


# ---------------------------------------------------------------------------------
class Main(QtGui.QMainWindow):
    """
    Main window
    """

    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(400, 200)
        self.setWindowTitle("Main")
        self.setWindowModality(QtCore.Qt.ApplicationModal)

        self.centralwidget = QtGui.QWidget(self)
        self.setCentralWidget(self.centralwidget)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.buttonBox = QtGui.QDialogButtonBox(self.centralwidget)
        self.buttonBox.setStandardButtons(QtGui.QDialogButtonBox.Cancel | 
                                          QtGui.QDialogButtonBox.Ok)
        self.buttonBox.setGeometry(QtCore.QRect(10, 160, 380, 20))

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.buttonBox, 
                     QtCore.SIGNAL('accepted()'), 
                     self.button_ok_clicked)
        self.connect(self.buttonBox, 
                     QtCore.SIGNAL('rejected()'), 
                     self.button_cancel_clicked)

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        # Launch console
        console = Console(parent=self)
        console.exec_()


# ---------------------------------------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    """
    Console window
    """

    def __init__(self, parent):
        super(self.__class__, self).__init__()

        self.parent = parent

        self.resize(400, 200)
        self.setWindowTitle("Console")
        self.setModal(True)

        self.verticalLayout = QtGui.QVBoxLayout(self)

        # Text edit
        # -------------------------------------------------------------------------
        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close)
        self.verticalLayout.addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close), 
                     QtCore.SIGNAL('clicked()'),
                     self.button_cancel_clicked)

        # Post initialization
        # -------------------------------------------------------------------------
        self.threadpool = QtCore.QThreadPool()
        self.threadpool.setMaxThreadCount(2)

        for index, url in enumerate(urls):
            worker = DownloadWorker(url=url,
                                    filepath='C:\\Users\\philippe\\Downloads',
                                    filename='url_%d.txt' % index,
                                    index=index)
            worker.signals.finished.connect(self.write_message)
            self.threadpool.start(worker)

        '''
        I have to wait for the end of the thread pool to make a post-processing.
        If I use the waitForDone I don't see my console until the all work is done 
        '''
        # self.threadpool.waitForDone()
        # self.write_stram('Thread pool finished')

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        if self.threadpool.activeThreadCount() != 0:
            pass  # How to interrupt the threadpool ?
        self.close()

    @QtCore.pyqtSlot(str)
    def write_message(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)


# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Main()
    window.show()
    app.exec_()

问题:

一切似乎都按预期进行,但我遇到了两个困难:

  1. 在线程池过程结束时我必须做一些 后期处理。如果我使用 waitForDone 方法,我看不到我的 控制台直到所有工作完成并且这不是行为类型 通缉。
  2. 如果单击控制台中的取消按钮,我需要中断 线程池,我不知道如何管理它。

最佳答案

我再次审视了这个问题(主要基于此: how-do-i-maintain-a-resposive-gui-using-qthread-with-pyqgis )。

所以我把之前串联的QThreadPool/QRunnable,换成了Queue/QThread。下面的代码给出了概述。

import os
import sys
import time
import urllib.request
import queue
from PyQt4 import QtCore, QtGui

url_1m = 'http://ipv4.sbg.proof.ovh.net/files/1Mio.dat'
url_10m = 'http://ipv4.sbg.proof.ovh.net/files/10Mio.dat'
url_100m = 'http://ipv4.sbg.proof.ovh.net/files/100Mio.dat'
url_1g = 'http://ipv4.sbg.proof.ovh.net/files/1Gio.dat'
url_10g = 'http://ipv4.sbg.proof.ovh.net/files/10Gio.dat'

urls = (url_1m, url_10m, url_100m, url_1g, url_10g)


# ---------------------------------------------------------------------------------
class WorkerThread(QtCore.QThread):
    """
    Worker thread
    """

    def __init__(self, parent_thread):
        QtCore.QThread.__init__(self, parent_thread)

    def run(self):
        self.running = True
        success = self.do_work()
        self.emit(QtCore.SIGNAL('jobFinished(PyQt_PyObject)'), success)

    def stop(self):
        self.running = False
        pass

    def do_work(self):
        return True

    def clean_up(self):
        pass


# ---------------------------------------------------------------------------------
class LongRunningTask(WorkerThread):
    def __init__(self, parent_thread, url, filepath, filename, index):
        WorkerThread.__init__(self, parent_thread)

        self.url = url
        self.filepath = filepath
        self.filename = filename
        self.index = index

    def do_work(self):
        t = time.time()
        self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'), 'Thread %d started\n' % self.index)

        try:
            # The urlretrieve method will copy a network object to a local file
            urllib.request.urlretrieve(url=self.url,
                                       filename=os.path.join(self.filepath,
                                                             self.filename))
        except IOError as error:
            self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                      'Thread %d error - ' % self.index + str(error) + '\n')
        finally:
            self.emit(QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                      'Thread %d ended %.2f s\n' % (self.index, time.time() - t))
            return True


# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    """
    Console window
    """

    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(400, 200)
        self.setWindowTitle("Console")
        self.setModal(True)

        self.setLayout(QtGui.QVBoxLayout())

        # Text edit
        # -------------------------------------------------------------------------
        self.textEdit = QtGui.QPlainTextEdit(self)
        self.textEdit.setReadOnly(True)
        self.textEdit_cursor = QtGui.QTextCursor(self.textEdit.document())
        self.layout().addWidget(self.textEdit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close)
        self.button_box.button(QtGui.QDialogButtonBox.Close).setEnabled(False)
        self.layout().addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
                     QtCore.SIGNAL('clicked()'),
                     self.reject)

        # Post-Initialization
        # -------------------------------------------------------------------------
        self.queue = queue.Queue()
        # self.queue = queue.Queue(maxsize=2)
        self.run_thread()

    # Connect functions
    # -----------------------------------------------------------------------------
    def cancel_thread(self):
        self.workerThread.stop()

    def job_finished_from_thread(self, success):
        self.workerThread.stop()
        self.queue.get()

        # Stop the pulsation
        if self.queue.empty():
            self.button_box.button(QtGui.QDialogButtonBox.Close).setEnabled(True)

        self.emit(QtCore.SIGNAL('jobFinished(PyQt_PyObject)'), success)

    def text_from_thread(self, value):
        self.textEdit.insertPlainText(value)
        cursor = self.textEdit.textCursor()
        self.textEdit.setTextCursor(cursor)

    def run_thread(self):
        for index, url in enumerate(urls):
            self.workerThread = LongRunningTask(parent_thread=self,
                                                url=url,
                                                filepath='C:\\Users\\philippe\\Downloads',
                                                filename='url_%d.txt' % index,
                                                index=index)
            self.connect(self.workerThread,
                         QtCore.SIGNAL('jobFinished(PyQt_PyObject)'),
                         self.job_finished_from_thread)
            self.connect(self.workerThread,
                         QtCore.SIGNAL('threadText(PyQt_PyObject)'),
                         self.text_from_thread)

            self.queue.put(self.workerThread)
            self.workerThread.start()

            # If I set the queue to maxsize=2, how to manage it here
            '''
            while not self.queue.full():
                self.queue.put(self.workerThread)
                self.workerThread.start()
            '''

# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

问题: 不幸的是,我遇到了其他类型的困难。实际上,队列可以包含大量线程(超过 100 个)。 1. 如何像QthreadPool及其setMaxThreadCount方法一样,管理并行运行的线程数量,以防止系统完全崩溃?

关于python - QThreadPool - 如何中断/如何明智地使用 waitForDone 方法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47250025/

相关文章:

python - 如何显示 PyQt 模式对话框并在关闭后从其控件中获取数据?

python - 如何每次比以前多移动一个列表?

python - 垃圾。 LinkExtractor 中的意外符号

python - 链接 loc 和 iloc 后更改 Pandas 中的值

python - 有没有办法管理列表中的 float ?

python - PyQt:插槽被多次调用

python - 如何将字典映射转换为 'integer aliasing'

python-3.x - Linux pyarrow undefined symbol

python - 删除s3文件也会删除文件夹python代码

python - 使用 pyuic 将 .ui 转换为 .py?