我想对从文件加载的一些输入数据运行并行计算。 (文件可能非常大,所以我为此使用了一个生成器。)
在一定数量的项目上,我的代码运行正常,但高于此阈值时程序挂起(一些工作进程未结束)。
有什么建议吗? (我用 python2.7、8 个 CPU 运行它;5,000 行仍然可以,7,500 行不起作用。)
首先,您需要一个输入文件。在 bash 中生成它:
for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done
然后,运行这个:
python2.7 main.py 100 counter.txt > run_log.txt
main.py:
#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp
def eat_queue(job_queue, result_queue):
"""Eats input queue, feeds output queue
"""
proc_name = mp.current_process().name
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
def execute(x):
"""Does the computation on the input data
"""
return x*x
def save_result(result):
"""Saves results in a list
"""
result_list.append(result)
def load(ifilename):
"""Generator reading the input file and
yielding it row by row
"""
ifile = open(ifilename, "r")
for line in ifile:
line = line.strip()
num = int(line)
yield (num)
ifile.close()
print("file closed".upper())
def put_tasks(job_queue, ifilename):
"""Feeds the job queue
"""
for item in load(ifilename):
job_queue.put(item)
for _ in range(get_max_workers()):
job_queue.put(None)
def get_max_workers():
"""Returns optimal number of processes to run
"""
max_workers = mp.cpu_count() - 2
if max_workers < 1:
return 1
return max_workers
def run(workers_num, ifilename):
job_queue = mp.Queue()
result_queue = mp.Queue()
# decide how many processes are to be created
max_workers = get_max_workers()
print "processes available: %d" % max_workers
if workers_num < 1 or workers_num > max_workers:
workers_num = max_workers
workers_list = []
# a process for feeding job queue with the input file
task_gen = mp.Process(target=put_tasks, name="task_gen",
args=(job_queue, ifilename))
workers_list.append(task_gen)
for i in range(workers_num):
tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
args=(job_queue, result_queue))
workers_list.append(tmp)
for worker in workers_list:
worker.start()
for worker in workers_list:
worker.join()
print "worker %s finished!" % worker.name
if __name__ == '__main__':
result_list = []
args = sys.argv
workers_num = int(args[1])
ifilename = args[2]
run(workers_num, ifilename)
最佳答案
这是因为您的代码中没有任何内容关闭 result_queue
。然后行为取决于内部队列缓冲细节:如果“不是很多”数据在等待,一切看起来都很好,但如果“很多”数据在等待,一切都会卡住。不能再多说了,因为它涉及到内部魔法层;-)但是文档确实警告过它:
Warning
As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
Note that a queue created using a manager does not have this issue. See Programming guidelines.
一个简单的修复方法:首先添加
result_queue.put(None)
在 eat_queue()
返回之前。然后添加:
count = 0
while count < workers_num:
if result_queue.get() is None:
count += 1
在主程序.join()
之前是 worker 。这会耗尽结果队列,然后一切都会干净地关闭。
顺便说一句,这段代码很奇怪:
while True:
try:
job = job_queue.get(block=False)
if job == None:
print(proc_name + " DONE")
return
result_queue.put(execute(job))
except Queue.Empty:
pass
为什么要进行非阻塞 get()
?只要队列为空,这就会变成 CPU 占用“繁忙循环”。 .get()
的主要目的是提供一种高效 方式来等待工作出现。所以:
while True:
job = job_queue.get()
if job is None:
print(proc_name + " DONE")
break
else:
result_queue.put(execute(job))
result_queue.put(None)
做同样的事情,但效率更高。
队列大小警告
你没有问过这个,但让我们在它咬你之前覆盖它;-) 默认情况下,Queue
的大小没有限制。例如,如果您向 Queue
添加十亿个项目,它将需要足够的 RAM 来容纳十亿个项目。因此,如果您的生产者生成工作项的速度比您的消费者处理它们的速度快,那么内存使用会很快失控。
幸运的是,这很容易修复:指定最大队列大小。例如,
job_queue = mp.Queue(maxsize=10*workers_num)
^^^^^^^^^^^^^^^^^^^^^^^
然后 job_queue.put(some_work_item)
将阻塞,直到消费者将队列的大小减少到小于最大值。通过这种方式,您可以使用需要普通 RAM 的队列来处理大量问题。
关于python - 多处理 - 读取大输入数据 - 程序挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20574810/