在有人将其标记为重复问题之前。我已经查看 StackOverflow 帖子好几天了,我还没有真正找到好的或令人满意的答案。
我有一个程序,在某些时候会获取单个字符串(还有许多其他参数和对象),对它们进行一些复杂的处理,然后返回 1 个或多个字符串。因为每个字符串都是单独处理的,所以在这里使用多重处理似乎很自然,特别是因为我在具有超过 100 个内核的机器上工作。
以下是一个最小的示例,最多可支持 12 到 15 个内核,如果我尝试为其提供更多内核,它会卡在 p.join()
处。我知道它卡在连接处,因为我尝试在连接之前和之后添加一些调试打印,并且它会在两个打印命令之间的某个时刻停止。
最小示例:
import os, random, sys, time, string
import multiprocessing as mp
letters = string.ascii_uppercase
align_len = 1300
def return_string(queue):
n_strings = [1,2,3,4]
alignments = []
# generating 1 to 4 sequences randomly, each sequence of length 1300
# the original code might even produce more than 4, but 1 to 4 is an average case
# instead of the random string there will be some complicated function called
# in the original code
for i in range(random.choice(n_strings)):
alignment = ""
for i in range(align_len):
alignment += random.choice(letters)
alignments.append(alignment)
for a in alignments:
queue.put(a)
def run_string_gen(cores):
processes = []
queue = mp.Queue()
# running the target function 1000 time
for i in range(1000):
# print(i)
process = mp.Process(target=return_string, args = (queue,))
processes.append(process)
if len(processes) == cores:
counter = len(processes)
for p in processes:
p.start()
for p in processes:
p.join()
while queue.qsize() != 0:
a = queue.get()
# the original idea is that instead of print
# I will be writing to a file that is already open
print(a)
processes = []
queue = mp.Queue()
# any leftovers processes
if processes:
for p in processes:
p.start()
for p in processes:
p.join()
while queue.qsize() != 0:
a = queue.get()
print(a)
if __name__ == "__main__":
cores = int(sys.argv[1])
if cores > os.cpu_count():
cores = os.cpu_count()
start = time.perf_counter()
run_string_gen(cores)
print(f"it took {time.perf_counter() - start}")
怀疑是队列已满,但也不是那么多字符串,当我给它 20 个核心时,它挂起,但这大约是 20*4=80
个字符串(如果选择始终是 4),但是这么多的字符串会让队列变满吗?
假设队列已满,我不确定何时应该检查并清空它。在 return_string 内执行此操作似乎是一个坏主意,因为其他一些进程也会有队列,并且可能会同时清空/填充它。那么我应该使用lock.acquire()和lock.release()吗?
这些字符串将被添加到文件中,因此我可以避免使用队列并将字符串输出到文件中。但是,因为启动进程意味着复制对象,所以我无法传递 _io.TextIOWrapper
对象(这是一个要追加的打开文件),但我需要打开和关闭 return_string< 内的文件
使用 lock.acquire()
和 lock.release()
进行同步,但这似乎浪费了不断打开和关闭输出文件以对其进行写入的操作。
一些建议的解决方案:
1-在加入之前使队列出队是我找到的答案之一。但是,我无法预测每个进程将花费多长时间,并在 p.start()
循环之后和 p.join()
之前添加一个 sleep
命令code> 很糟糕(至少对于我的代码而言),因为如果它们很快完成而我最终等待,那就浪费了很多时间,而整个想法就是在这里提高速度。
2- 添加某种符号字符,例如none
来了解一名工作人员是否完成。但没有得到这部分,如果我为 10 个核心运行目标函数 10 次,我将有 10 个哨兵,但问题是它挂起,无法进入队列清空并检查哨兵。
关于在这里做什么有什么建议或想法吗?
最佳答案
仔细阅读 `multiprocessing.Queue 的文档。阅读第二个警告,其中部分内容为:
Warning: As mentioned above, if a child process has put items on a queue (and it has not used
JoinableQueue.cancel_join_thread
), then that process will not terminate until all buffered items have been flushed to the pipe.
This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.
简单来说,您的程序在从队列中读取项目之前加入进程,从而违反了这一点。您必须颠倒操作顺序。那么问题就变成了如果子进程仍在运行并正在写入队列,主进程如何知道何时停止读取。最简单的解决方案是为每个子流程写入一个特殊的哨兵记录作为最终项目,表明该流程不再写入任何项目。然后,主进程可以简单地执行阻塞读取,直到它看到 N 个哨兵记录,其中 N 是它已启动的将写入队列的进程数。哨兵记录必须是不会被误认为是要处理的正常项目的任何唯一记录。 无
足以达到此目的:
import os, random, sys, time, string
import multiprocessing as mp
letters = string.ascii_uppercase
align_len = 1300
SENTINEL = None # no more records sentinel
def return_string(queue):
n_strings = [1,2,3,4]
alignments = []
# generating 1 to 4 sequences randomly, each sequence of length 1300
# the original code might even produce more than 4, but 1 to 4 is an average case
# instead of the random string there will be some complicated function called
# in the original code
for i in range(random.choice(n_strings)):
alignment = ""
for i in range(align_len):
alignment += random.choice(letters)
alignments.append(alignment)
for a in alignments:
queue.put(a)
# show this process is through writing records:
queue.put(SENTINEL)
def run_string_gen(cores):
processes = []
queue = mp.Queue()
# running the target function 1000 time
for i in range(1000):
# print(i)
process = mp.Process(target=return_string, args = (queue,))
processes.append(process)
if len(processes) == cores:
counter = len(processes)
for p in processes:
p.start()
seen_sentinel_count = 0
while seen_sentinel_count < len(processes):
a = queue.get()
if a is SENTINEL:
seen_sentinel_count += 1
# the original idea is that instead of print
# I will be writing to a file that is already open
else:
print(a)
for p in processes:
p.join()
processes = []
# The same queue can be reused:
#queue = mp.Queue()
# any leftovers processes
if processes:
for p in processes:
p.start()
seen_sentinel_count = 0
while seen_sentinel_count < len(processes):
a = queue.get()
if a is SENTINEL:
seen_sentinel_count += 1
else:
print(a)
for p in processes:
p.join()
if __name__ == "__main__":
cores = int(sys.argv[1])
if cores > os.cpu_count():
cores = os.cpu_count()
start = time.perf_counter()
run_string_gen(cores)
print(f"it took {time.perf_counter() - start}")
打印:
...
NEUNBZVXNHCHVIGNDCEUXJSINEJQNCOWBMUJRTIASUEJHDJUWZIYHHZTJJSJXALZHOEVGMHSVVMMIFZGLGLJDECEWSVZCDRHZWVOMHCDLJVQLQIQCVKBEVOVDWTMFPWIWIQFOGWAOPTJUWKAFBXPWYDIENZTTJNFAEXDVZHXHJPNFDKACCTRTOKMVDGBQYJQMPSQZKDNDYFVBCFMWCSCHTVKURPJDBMRWFQAYIIALHDJTTMSIAJAPLHUAJNMHOKLZNUTRWWYURBTVQHWECAFHQPOZZLVOQJWVLFXUEQYKWEFXQPHKRRHBBCSYZOHUDIFOMBSRNDJNBHDUYMXSMKUOJZUAPPLOFAESZXIETOARQMBRYWNWTSXKBBKWYYKDNLZOCPHDVNLONEGMALL
it took 32.7125509
更新
使用多处理池完成相同的代码,这避免了重新创建进程:
import os, random, sys, time, string
import multiprocessing as mp
letters = string.ascii_uppercase
align_len = 1300
SENTINEL = None # no more records sentinel
def return_string():
n_strings = [1,2,3,4]
alignments = []
# generating 1 to 4 sequences randomly, each sequence of length 1300
# the original code might even produce more than 4, but 1 to 4 is an average case
# instead of the random string there will be some complicated function called
# in the original code
for i in range(random.choice(n_strings)):
alignment = ""
for i in range(align_len):
alignment += random.choice(letters)
alignments.append(alignment)
return alignments
def run_string_gen(cores):
def my_callback(result):
alignments = result
for alignment in alignments:
print(alignment)
pool = mp.Pool(cores)
for i in range(1000):
pool.apply_async(return_string, callback=my_callback)
# wait for completion of all tasks:
pool.close()
pool.join()
if __name__ == "__main__":
cores = int(sys.argv[1])
if cores > os.cpu_count():
cores = os.cpu_count()
start = time.perf_counter()
run_string_gen(cores)
print(f"it took {time.perf_counter() - start}")
打印:
...
OMCRIHWCNDKYBZBTXUUYAGCMRBMOVTDOCDYFGRODBWLIFZZBDGEDVAJAJFXWJRFGQXTSCCJLDFKMOENGAGXAKKFSYXEQOICKWFPSKOHIMCRATLVLVLMGFAWBDIJMZMVMHCXMTVJBSWXTLDHEWYHUMSQZGGFWRMOHKKKGMTFEOTTJDOQMOWWLKTOWHKCIUNINHTGUZHTBGHROPVKQBNEHQWIDCZUOJGHUXLLDGHCNWIGFUCAQAZULAEZPIP
it took 2.1607988999999996
关于python - 多处理卡在连接处,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70465276/