Python 线程/子进程;当子进程仍在运行时,线程对象无效

标签 python multithreading python-2.7

我正在编写一些监视工具,其中一个工具会处理一系列 NFS 安装的文件系统,并尝试将测试文件写入每个共享。我为每个正在测试的文件管理器生成一个线程。经过一定的超时后,我想终止任何仍在运行的线程。我不需要收集线程的输出,不需要加入或连接到任何线程,我只需要停止任何在超时后仍在运行的线程。

所以我故意在每个线程中运行 sleep(300) ,所以我知道当我循环所有事件线程并尝试杀死它们时,每个线程中生成的进程最终仍在运行。这适用于 5-20 个线程,但最终会在一个线程上失败,提示 self.pid 不再有效。发生这种情况的线程是随机的,不一定与上次运行的线程相同。

class NFSWriteTestThread(threading.Thread):

    def __init__(self, filer, base_mnt_point, number):
        super(NFSWriteTestThread, self).__init__()
        self.tname    = filer
        self.tnum     = number
        self.filer    = filer
        self.mntpt    = base_mnt_point
        self.process  = None
        self.pid      = None

    def run(self):
        start = time.time()
#        self.process = subprocess.Popen(['/bin/dd', 'if=/dev/zero', 'bs=1M', 'count=5', 'of=' + self.testfile], shell=False)
        self.process = subprocess.Popen(['/bin/sleep', '300'], shell=False)
        time.sleep(1)
        logger.debug("DEBUG: %s=%d" % (self.tname, self.process.pid))
        self.pid     = self.process.pid
        logger.info("  NFS write test command initiaited on '%s', pid=%d" % (self.filer, self.pid))
        self.process.wait()
#        self.output, self.error = self.process.communicate()
        end = time.time()
        logger.info("  NFS write test for '%s' completed in %d seconds" % (self.filer, end - start))
        return

    def getThreadName(self):
        return self.tname

    def getThreadNum(self):
        return self.tnum

    def getThreadPID(self):
        if self.pid:
            return self.pid
        else:
            return "unknown"

    def isAlive(self):
        if not self.process:
            logger.debug("Error: self.process is invalid (%s)" % type(self.process))
#        if self.process.poll():
#            logger.info("NFS write test operation for thread '%s' is still active" % self.filer)
#        else:
#            logger.info("NFS write test operation for thread '%s' is inactive" % self.filer)
        return

    def terminate(self):
        os.kill(self.process.pid, signal.SIGTERM)
        return

    def kill(self):
        os.kill(self.process.pid, signal.SIGKILL)
        return

def initLogging(config):

    logfile   = os.path.join(config['logdir'], config['logfilename'])
    fformat   = logging.Formatter('%(asctime)s   %(message)s', "%Y-%m-%d %H:%M:%S %Z")
    cformat   = logging.Formatter('%(asctime)s   %(message)s', "%Y-%m-%d %H:%M:%S %Z")
    clogger   = None
    flogger   = None

    if config['debug']:
        loglevel = logging.DEBUG

    if not os.path.exists(config['logdir']):
        os.makedirs(config['logdir'])
        os.chmod(config['logdir'], 0700)
        os.chown(config['logdir'], 0, 0)

    try:
        logger = logging.getLogger('main')
        logger.setLevel(logging.DEBUG)

        # Define a file logger
        flogger = logging.FileHandler(logfile, 'w')
        flogger.setLevel(logging.DEBUG)
        flogger.setFormatter(fformat)
        logger.addHandler(flogger)

        # Define a console logger if verbose
        if config['verbose']:
            clogger = logging.StreamHandler()
            clogger.setLevel(logging.DEBUG)
            clogger.setFormatter(cformat)
            logger.addHandler(clogger)
    except Exception, error:
        print "Error: Unable to initialize file logging:  %s" % error
        sys.exit(1)

    logger.info("Script initiated.")

    logger.info("Using the following configuration:")
    for key, value in sorted(config.iteritems()):
        logger.info("    %20s = '%-s'" % (key, value))

    return logger

def parseConfigFile(cfg):

    if not os.path.isfile(cfg['cfgfile']) or not os.access(cfg['cfgfile'], os.R_OK):
        print "Error: '%s' does not exist or is not readable, terminating." % cfg['cfgfile']
        sys.exit(1)

    config = SafeConfigParser()
    config.read(cfg['cfgfile'])

    _cfg = dict(config.items(cfg['cfgfilestanza']))

    _cfgfilers = config.get(cfg['cfgfilestanza'], 'managed_filers')
    _tmpfilers = _cfgfilers.split(',')

    # populate a list containing all filers which will be meged into the global cfg[] dict
    _cfg['filers'] = []

    for _f in _tmpfilers:
        _cfg['filers'].append(_f.strip())

    return _cfg


logger = initLogging(cfg)
cfg.update(parseConfigFile(cfg))

threads     = []
numThreads  = 0

for filer in cfg['filers']:
    numThreads += 1
    logger.debug("  spawning NFS wite test thread for '%s', thread number %s" % (filer, numThreads))
    t = NFSWriteTestThread(filer, cfg['base_mnt_point'], numThreads)
    t.start()
    threads.append(t)
#    time.sleep(1)

logger.info("spawned %d NFS write test child threads" % numThreads)

logger.info("sleeping for %d seconds" % cfg['timeout'])
time.sleep(cfg['timeout'])

if (threading.activeCount() > 1):
    logger.info("there are %d NFS write test threads active after the timeout:" % (threading.activeCount() - 1))
    for thr in threading.enumerate():
        logger.debug("theadname=%s" % thr.name)
        if re.match("MainThread", thr.getName()):
            pass
        else:
            logger.info("thread '%s' (thread %d) is still alive" % (thr.getThreadName(), thr.getThreadNum()))
#            thr.isAlive()
            logger.info("killing thread for '%s' (pid=XX) with SIGTERM" % (thr.getThreadName()))
#            logger.info("killing thread for '%s' (pid=%d) with SIGTERM" % (thr.getThreadName(), thr.getThreadPID()))
            thr.kill()


logger.info("Script complete")
sys.exit(0)

在这里您可以看到输出:

2014-11-10 09:00:22 CST   there are 173 NFS write test threads active after the timeout:
2014-11-10 09:00:22 CST   theadname=Thread-165
2014-11-10 09:00:22 CST   thread 'hostname1' (thread 165) is still alive
2014-11-10 09:00:22 CST   killing thread for 'hostname1' (pid=XX) with SIGTERM
2014-11-10 09:00:22 CST   theadname=Thread-97
2014-11-10 09:00:22 CST   thread 'hostname2' (thread 97) is still alive
2014-11-10 09:00:22 CST     NFS write test for 'hostname1' completed in 60 seconds
2014-11-10 09:00:22 CST   killing thread for 'hostname2' (pid=XX) with SIGTERM
2014-11-10 09:00:22 CST   theadname=Thread-66
2014-11-10 09:00:22 CST   thread 'hostname3' (thread 66) is still alive
2014-11-10 09:00:22 CST     NFS write test for 'hostname2' completed in 60 seconds
2014-11-10 09:00:22 CST   killing thread for 'hostname3' (pid=XX) with SIGTERM
2014-11-10 09:00:22 CST   theadname=Thread-121
2014-11-10 09:00:22 CST   thread 'hostname4' (thread 121) is still alive
2014-11-10 09:00:22 CST   killing thread for 'hostname4' (pid=XX) with SIGTERM
Traceback (most recent call last):
2014-11-10 09:00:22 CST     NFS write test for 'hostname3' completed in 60 seconds
  File "./NFSWriteTestCheck.py", line 199, in <module>
    thr.kill()
  File "./NFSWriteTestCheck.py", line 84, in kill
    os.kill(self.process.pid, signal.SIGKILL)
AttributeError: 'NoneType' object has no attribute

显示此错误时,进程仍在运行,已在 shell 中使用 ps 进行验证。为什么线程对象不再有效?在抛出此错误时,线程执行应该在此时:

self.process.wait()

只是在这里挠头,想知道我是否遇到了错误或其他问题。

最佳答案

你无法停止线程,你只能中断它们正在做的事情。在您的情况下,线程正在等待 subprocess.call。设置事件没有任何效果,因为您的线程不会等待该事件。这里的解决方案是终止子进程,这意味着您将需要 Popen 对象。

我将实现直接放入 run 方法中,以便 Popen 对象很方便。

class NFSWriteTestThread(threading.Thread):

    def __init__(self, filer, mntpoint, number):
        super(NFSWriteTestThread, self).__init__()
        self.name   = filer
        self.filer  = filer
        self.mntpt  = mntpoint
        self.tnum   = number
        self._proc  = None

    def run(self):
        testfile = "%s/%s/test/test.%s" % (mountpoint, filer, filer)
        testcmd  = "/bin/bash -c '/bin/dd if=/dev/zero bs=1024 count=1024 of=" + testfile + " >/dev/null 2>/dev/null; sleep 120'"
        self._proc = subprocess.Popen(testcmd, shell=True)
        self._proc.wait()
        return

    def getName(self):
        return self.name

    def kill(self):
        if self._proc:
            self._proc.terminate()

    def stopped(self):
        if self._proc:
            return self._proc.poll() is not None
        else:
            return True

关于Python 线程/子进程;当子进程仍在运行时,线程对象无效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26808338/

相关文章:

python - 执行矩阵乘法时出现内存错误

python - 回滚 post_save 接收器中的事务?

java - 从可运行类内部中断线程? (java)

python - 如何安装与不同版本的Python关联的pip

python - 增加每秒请求量

python - 在opencv SGBM中,当我使名为numDisparities的参数变大时,为什么视差图的左侧部分变暗?

python - 通过将 Stanford NER 与 Python 结合使用将名字和姓氏标记为一个标记

c# - C# 中的线程安全属性

java - 多线程:为什么我的程序终止时有不同的结果?

java - 如何从 Python 2.7 调用 C++ 和/或 Java 函数?