我目前正在使用多处理调试一个问题。
我有以下 child :
class Target(multiprocessing.Process):
def __init__(self, work_queue, result_queue, suite_name, test_settings, html_log_dir, output_file, ip_address, fit_dir):
multiprocessing.Process.__init__(self)
# initialize other variables
def run(self):
print multiprocessing.current_process()
suite_start_time = time.clock()
while not self.kill_received:
# get a task
try:
job = self.work_queue.get(True, 2)
except Queue.Empty:
self._log('Work queue empty, creating XML result file')
self.create_xml_result_file(suite_start_time)
break
# the actual processing, run the test.
fitnesse_common.log_output("\n(PID " + str(self.pid) + "): End of process")
def create_xml_result_file(self, start_time):
# generate result
父进程基本上只是启动几个 (12) 个目标并等待它们全部加入。
问题是,由于某种原因,子进程运行到运行函数的末尾(我看到进程打印结束),然后由于某种原因没有终止,这阻止了父进程继续。
编辑 - 并非所有生成的进程都挂起,只有几个。在 12 个派生进程中,通常只有 2-4 个在完成运行功能后挂起。
我考虑过在运行函数结束时调用终止,但 Python 文档表明这是个坏主意。
我在 Stack Overflow 上看过几篇关于 Python 多处理的不同文章,其中大部分与父进程的问题有关。
任何想法或帮助将不胜感激。
更新:这是一个很容易重现问题的脚本:
import multiprocessing, Queue
import subprocess
import time
import sys
class Target(multiprocessing.Process):
def __init__(self, work_queue, results_queue, delay_length):
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.work_queue = work_queue
self.results_queue = results_queue
self.delay_length = delay_length
self.kill_received = False
def run(self):
while not self.kill_received:
# get a task
try:
job = self.work_queue.get(True, 2)
except Queue.Empty:
self._log('Work queue empty, prepare to terminate')
break
time.sleep(self.delay_length)
self._log("Sleeping done")
results = self._run_an_application(job)
self.results_queue.put(results)
self._log("Have put results on queue " + str(job) + "-" + results)
self._log("\n(PID " + str(self.pid) + "): End of process")
def _log(self, text):
print ('PID ' + str(self.pid) + ' => ' + text)
sys.stdout.flush()
def _run_an_application(self, app):
try:
test_output = subprocess.check_output(app)
except subprocess.CalledProcessError, e:
log_output('### Process check_output threw exception CalledProcessError')
test_output = e.output
return test_output
if __name__ == "__main__":
test_jobs = []
started_targets = []
# run
# load up work queue
for i in range(500):
test_jobs.append('spewage')
work_queue = multiprocessing.Queue()
for job in test_jobs:
work_queue.put(job)
# create a queue to pass to targets to store the results
result_queue = multiprocessing.Queue()
# spawn targets
for i in range(12):
started_targets.append(Target(work_queue, result_queue, i))
# start all targets
for i in range(len(started_targets)):
started_targets[i].start()
print "starting process no %s with id: %s" % (i, started_targets[i].pid)
print 'Waiting for all processes to join'
# wait for all targets to finish
for i in range(len(started_targets)):
started_targets[i].join()
print 'All processes have joined'
# collect the results off the queue
while not result_queue.empty():
target_result = result_queue.get()
print "Test job - " + target_result
print ('All Tests completed')
这是“spewage”应用程序的源代码(它是 C++)。
#include <iostream>
#include <windows.h>
using namespace std;
int main()
{
for (int i = 0; i < 500; i++)
{
cout << "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" << endl;
Sleep(20);
}
return 0;
}
因为它似乎与推送到 stdout 的数量有关,所以 C++ 程序可以很容易地被另一个打印很多东西的脚本替换。
最佳答案
我设法弄清楚了这个问题。它似乎与子流程中的输出量有关。在进程的 run() 函数结束时,我需要放入 self.results_queue.cancel_join_thread()
我仍然很好奇为什么它在标准输出不多的情况下工作,但是当有很多时进程会挂起。根据 Python 文档,我使用 result_queue 的方式应该一直锁定,即使它没有。
关于Python 多处理子进程在运行退出时挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31965884/