python - 很长的I/O进程能否由线程处理

标签 python multithreading pyqt multiprocessing pyqt4

我在这里开始一个新主题,该主题将链接到 this question .

我邀请您阅读背景以了解全局概念。

所以我有一个依赖于 python 3.2 API(由私有(private)公司开发)的下载功能。每个文件的过程最多可能需要 400 秒。

很明显,我没有只有一个文件要下载,所以我已经尝试了好几天将每个下载过程都放在一个线程池中。池中的每个线程都应该完全独立于 GUI 主线程。当其中一个完成时,它应该只向 GUI 发送一个信号。

我做了几次测试,但无论使用什么技术,但是

  1. GUI 卡住了;
  2. 结果仅在所有线程处理结束时给出,而不是按要求逐一给出。

我觉得API给的下载方式是阻塞函数,不能线程化。

所以我的问题很简单:如何知道一个 I/O 方法是否可以通过线程处理。


2017-11-24更新

您会在下面找到部分符合我预期的初稿(带有串联 multiprocessing.pool/map_async)。正如您将看到的,不幸的是,我不得不插入一个“忙等待循环”,以便在 QPlainTextEdit 上获得一些关于正在发生的事情的信息。

任务的结果仅在全局处理结束时给出(行为 map_async)。这不是我要找的。我想插入更多实时时间,并立即在控制台上查看每个已完成任务的消息。

import time
import multiprocessing
import private.library as bathy
from PyQt4 import QtCore, QtGui
import os
import sys

user = 'user'
password = 'password'
server = 'server'
basename = 'basename'

workers = multiprocessing.cpu_count()

node = bathy.NodeManager(user, password, server)
database = node.get_database(basename)

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)


# ---------------------------------------------------------------------------------
def download(surface_id, index):
    global node
    global database

    t = time.time()
    message = 'Surface #%d - Process started\n' % index

    surface = database.get_surface(surface_id)
    metadata = surface.get_metadata()
    file_path = os.path.join("C:\\Users\\philippe\\Test_Download",
                             metadata["OBJNAM"] + ".surf")

    try:
        surface.download_bathymetry(file_path)
    except RuntimeError as error:
        message += "Error : " + str(error).split('\n')[0] + '\n'
    finally:
        message += ('Process ended : %.2f s\n' % (time.time() - t))

    return message


# ---------------------------------------------------------------------------------
 def pass_args(args):
    # Method to pass multiple arguments to download (multiprocessing.Pool)
    return download(*args)


# ---------------------------------------------------------------------------------
class Console(QtGui.QDialog):
    def __init__(self):
        super(self.__class__, self).__init__()

        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.setModal(True)

        self.verticalLayout = QtGui.QVBoxLayout(self)

        # Text edit
        # -------------------------------------------------------------------------

        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)

        # Ok / Close
        # -------------------------------------------------------------------------
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(QtGui.QDialogButtonBox.Close | 
                                           QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)

        # Connect definition
        # -------------------------------------------------------------------------

        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Close),
                     QtCore.SIGNAL('clicked()'),
                     self.button_cancel_clicked)
        self.connect(self.button_box.button(QtGui.QDialogButtonBox.Ok),
                     QtCore.SIGNAL('clicked()'),
                     self.button_ok_clicked)

        # Post initialization
        # -------------------------------------------------------------------------
        self.pool = multiprocessing.Pool(processes=workers)

    # Connect functions
    # -----------------------------------------------------------------------------
    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        jobs_args = [(surface_id, index) for index, surface_id in enumerate(ids)]
        async = pool.map_async(pass_args, jobs_args)
        pool.close()

        # Busy waiting loop
        while True:
            # pool.map_async has a _number_left attribute, and a ready() method
            if async.ready():
                self.write_stream("All tasks completed\n")
                pool.join()
                for line in async.get():
                    self.write_stream(line)
                break

            remaining = async._number_left
            self.write_stream("Waiting for %d task(s) to complete...\n" % remaining)
            time.sleep(0.5)


    # Other functions
    # -----------------------------------------------------------------------------
    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)
        app.processEvents()


# ---------------------------------------------------------------------------------
if __name__ == '__main__':
    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

问题

  1. 乍一看上面的代码是否存在概念性错误?
  2. 在这种特定情况下,我是否必须使用 apply_async 方法来获得更具交互性的东西?
  3. 您能否指导我如何使用回调函数发布自定义事件来更新控制台(@ekhumoro 建议的方法)?

2017-11-25更新

我尝试了 apply_async:

def button_ok_clicked(self):
    # Pool.apply_async - the call returns immediately instead of 
    # waiting for the result
    for index, surface_id in enumerate(ids):
        async = pool.apply_async(download, 
                                 args=(surface_id, index),
                                 callback=self.write_stream)
    pool.close()

带有回调:

def write_stream(self, text):
    # This is called whenever pool.apply_async(i) returns a result
    self.text_edit.insertPlainText(text)
    cursor = self.text_edit.textCursor()
    self.text_edit.setTextCursor(cursor)
    # Update the text edit
    app.processEvents()

不幸的是,这样做会使应用程序崩溃。我想我必须设置一个锁定机制来防止所有任务同时写入文本编辑。

最佳答案

下面是示例脚本的简化版本,展示了如何使用回调发布自定义事件。每个作业都通过 apply_async 单独处理,因此会更新一个简单的计数器以指示所有作业何时完成。

import sys, time, random, multiprocessing
from PyQt4 import QtCore, QtGui

ids = (10547, 3071, 13845, 13846, 13851, 13844, 5639, 4612, 4613, 954,
       961, 962, 4619, 4620, 4622, 4623, 4624, 4627, 4628, 4631,
       4632, 4634, 4635, 4638, 4639, 4640, 4641, 4642, 10722, 1300,
       1301, 1303, 1310, 1319, 1316, 1318, 1321, 1322, 1323, 1324,
       1325, 1347, 1348, 1013, 1015, 1320, 8285, 8286, 8287, 10329,
       9239, 9039, 5006, 5009, 5011, 5012, 5013, 5014, 5015, 5025,
       5026, 4998, 5040, 5041, 5042, 5043, 11811, 2463, 2464, 5045,
       5046, 5047, 5048, 5049, 5053, 5060, 5064, 5065, 5068, 5069,
       5071, 5072, 5075, 5076, 5077, 5079, 5080, 5081, 5082, 5083,
       5084, 5085, 5086, 5087, 5088, 5090, 5091, 5092, 5093)

def download(surface_id, index):
    t = time.time()
    message = 'Surface #%s (%s) - Process started\n' % (index, surface_id)
    time.sleep(random.random())
    message += 'Process ended : %.2f s\n' % (time.time() - t)
    return message

def pass_args(args):
    return download(*args)

class CustomEvent(QtCore.QEvent):
    DownloadComplete = QtCore.QEvent.registerEventType()

    def __init__(self, typeid, *args):
        super().__init__(typeid)
        self.data = args

class Console(QtGui.QDialog):
    def __init__(self):
        super().__init__()
        self.resize(600, 300)
        self.setMinimumSize(QtCore.QSize(600, 300))
        self.setWindowTitle("Console")
        self.verticalLayout = QtGui.QVBoxLayout(self)
        self.text_edit = QtGui.QPlainTextEdit(self)
        self.text_edit.setReadOnly(True)
        self.text_edit_cursor = QtGui.QTextCursor(self.text_edit.document())
        self.verticalLayout.addWidget(self.text_edit)
        self.button_box = QtGui.QDialogButtonBox(self)
        self.button_box.setStandardButtons(
            QtGui.QDialogButtonBox.Close | QtGui.QDialogButtonBox.Ok)
        self.button_box.setObjectName("button_box")
        self.verticalLayout.addWidget(self.button_box)
        self.button_box.button(QtGui.QDialogButtonBox.Close
            ).clicked.connect(self.button_cancel_clicked)
        self.button_box.button(QtGui.QDialogButtonBox.Ok
            ).clicked.connect(self.button_ok_clicked)
        self.pool = multiprocessing.Pool(None)

    def event(self, event):
        if event.type() == CustomEvent.DownloadComplete:
            message, complete = event.data
            self.write_stream(message)
            if complete:
                self.write_stream('Downloads complete!')
        return super().event(event)

    def button_cancel_clicked(self):
        self.close()

    def button_ok_clicked(self):
        total = len(ids)
        def callback(message):
            nonlocal total
            total -= 1
            QtGui.qApp.postEvent(self, CustomEvent(
                CustomEvent.DownloadComplete, message, not total))
        for index, surface_id in enumerate(ids):
            self.pool.apply_async(
                pass_args, [(surface_id, index)], callback=callback)

    def write_stream(self, text):
        self.text_edit.insertPlainText(text)
        cursor = self.text_edit.textCursor()
        self.text_edit.setTextCursor(cursor)

if __name__ == '__main__':

    app = QtGui.QApplication(sys.argv)
    window = Console()
    window.show()
    app.exec_()

关于python - 很长的I/O进程能否由线程处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47431119/

相关文章:

python - QApplication实例导致python shell运行缓慢

python - url_for 构建错误

python - Agg 和 Cairo 之间的 Matplotlib 后端差异

java - 多个线程可以同时等待一个对象吗?

java - SimpleThreads 示例中的 threadMessage 是否应该同步?

python-3.x - 如何使Selenium线程运行(每个线程都有自己的驱动程序)

python - QMediaPlayer。如何播放带有多个音频的视频?

python - pyQT QNetworkManager 和 ProgressBars

python - 理解 django admin readonly_fields

python - 在后台打开网络浏览器