Python 3.4并发.futures.Executor不提供暂停和恢复线程的控制

标签 python multithreading python-multithreading python-3.4 concurrent.futures

我正在使用concurrent.future.ThredPoolExecutor进行多线程处理,我正在执行一些http服务,我希望对线程的控制可以在服务器出现故障时暂停执行,启动服务器然后恢复执行。

服务器宕机的触发因素是,我正在检查某个文件在特定位置是否可用,然后我将不得不暂停执行。

因此,concurrent.futures.Executor.shutdown() 将向执行器发出信号,表明当当前挂起的 future 执行完成时,它应该释放正在使用的任何资源。

但是当我使用执行器的 shutdown() 方法时,它不会立即关闭线程,而是在完成整个执行后调用 shutdown() 。

事实上,我正在调用 shutdown() 方法,因为我在 concurren.future 中找不到暂停和恢复。因此,作为替代方案,一旦线程完成执行,我就会从列表中删除网址。这样我就可以传递剩余的列表并调用相同的方法。

这是代码:

import concurrent.futures
import urllib.request
import os.path
import datetime
import sys
import pathlib
from errno import ENOENT, EACCES, EPERM
import time
import threading

listOfFilesFromDirectory =  []
webroot = settings.configuration.WEBSERVER_WEBROOT
WEBSERVER_PORT = settings.configuration.WEBSERVER_PORT
shutdown = False

def class myclass:

#populating the list with the urls from a file
def triggerMethod(path):
    try:
        for line in open(path):
            listOfFilesFromDirectory.append(line)
    except IOError as err:
        if err.errno == ENOENT:
            #logging.critical("document.txt file is missing")
            print("document.txt file is missing")
        elif err.errno in (EACCES, EPERM):
            #logging.critical("You are not allowed to read document.txt")
            print("You are not allowed to read document.txt")
        else:
            raise   

# calling this method to stop the threads and restart after a sleep of 100 secs, as the list will always have the urls that were not executed.
def stopExecutor(executor):
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            executor.shutdown( )
            time.sleep(100)
            runRegressionInMultipleThreads( )
            break

def load_url(url, timeout):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + "/" + url, timeout = timeout)
    return conn.info()

def trigegerFunc( ):
    # We can use a with statement to ensure threads are cleaned up promptly
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60): url for url in listOfFilesFromDirectory}

        t = threading.Thread(target=stopExecutor, args=(executor))
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
                listOfFilesFromDirectory.remove(url)
            else:
                if data:
                    if "200" in data:
                        listOfFilesFromDirectory.remove(url)
                    else:
                        listOfFilesFromDirectory.remove(url)
                else:
                    listOfFilesFromDirectory.remove(url)
        shutdown = True
        t.join()                


triggerMethod("C:\inetpub\wwwroot")
trigegerFunc()

最佳答案

您无法在 Python 中取消或暂停/恢复线程。 executor.shutdown() 的作用与您引用文档时所说的完全一样:

Signal the executor that it should free any resources that it is using when the currently pending futures are done executing.

请注意,粗体部分 - 执行器仅在所有当前正在执行的任务完成后才会关闭。要获得您想要的控制类型,您需要在单独的进程中运行 urllib 调用,如下所示(这是脚本的简化版本):

import time
import os.path
import threading
import urllib.request
import multiprocessing
import concurrent.futures
from multiprocessing import cpu_count

shutdown = False
should_cancel = False

def stopTasks():
    global should_cancel
    filePath = "C:\logs\serverStopLog.txt"
    while not shutdown:
        time.sleep(5)
        if os.path.isfile(filePath):
            should_cancel = True
            break

def _load_url(num, timeout, q):
    conn = urllib.request.urlopen('http://localhost:' + WEBSERVER_PORT + 
                                  "/" + url, timeout=timeout)
    q.put(conn.info())

def load_url(num, timeout):
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=_load_url, args=(num, timeout, q))
    p.start()
    while p.is_alive():
        time.sleep(.5)
        if should_cancel:
            p.terminate()  # This will actually kill the process, cancelling the operation
            break # You could return something here that indicates it was cancelled, too.
    else:
        # We'll only enter this if we didn't `break` above.
        out = q.get()
        p.join()
        return out

def triggerFunc():
    global shutdown
    with concurrent.futures.ThreadPoolExecutor(max_workers=cpu_count()) as executor:
        # Start the load operations and mark each future with its URL
        future_to_url = {executor.submit(load_url, url, 60):
                             url for url in listOfFilesFromDirectory}
        t = threading.Thread(target=stopTasks)
        t.start()
        for future in concurrent.futures.as_completed(future_to_url):
            info = future.result()
            print("done: {}".format(info))
            # other stuff you do
        shutdown = True
        t.join()

if __name__ == "__main__":
    triggerFunc()

因为我们实际上可以通过向子进程发送 SIGTERM 来终止子进程,所以我们可以在 urlopen 操作仍在进行时真正取消它。

关于Python 3.4并发.futures.Executor不提供暂停和恢复线程的控制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24963016/

相关文章:

python - 如何在 Python 中使用线程?

multithreading - Python多线程模型

python - 如何使用 django-background-tasks

python - 将 Pickle 文件记录为 Mlflow 运行的一部分

python - 使用 pd.merge 从另一个数据帧映射数据帧中多列的值

c# - 仅当满足条件时才从 ConcurrentQueue 中出队

Python:根据其他数据帧的条件创建列

关于 linux 内存问题的 C++ 多线程程序

c# - 多线程奇点

python - 在Python中使用线程更改全局变量