我正在尝试使用 multiprocessing
模块并行化一些计算。
如何确定 multiprocessing.Pool.map_async
生成的每个进程都在不同的(先前创建的)文件夹上运行?
问题是每个进程都会调用一些将临时文件写入磁盘的第三方库,如果您在同一个文件夹中运行许多库,就会把一个与另一个搞混。
此外,我无法为 map_async 进行的每个函数调用都创建一个新文件夹,而是我想创建尽可能少的文件夹(即每个进程一个)。
代码类似于:
import multiprocessing,os,shutil
processes=16
#starting pool
pool=multiprocessing.Pool(processes)
#The asked dark-magic here?
devshm='/dev/shm/'
#Creating as many folders as necessary
for p in range(16):
os.mkdir(devshm+str(p)+'/')
shutil.copy(some_files,p)
def example_function(i):
print os.getcwd()
return i*i
result=pool.map_async(example_function,range(1000))
这样在任何时候,example_function 的每次调用都在不同的文件夹上执行。
我知道一个解决方案可能是使用子进程来产生不同的进程,但我想坚持使用多处理(我需要为每个产生的子进程腌制一些对象,写入磁盘,读取,取消腌制,而不是通过函数调用传递对象本身(使用 functools.partial)。
附言。
This question在某种程度上是相似的,但该解决方案并不能保证每个函数调用都发生在不同的文件夹中,这确实是我的目标。
最佳答案
由于您没有在问题中指定,我假设您在函数执行完成后不需要目录的内容。
绝对最简单的方法是在使用它们的函数中创建和销毁临时目录。这样,您的其余代码就不会关心工作进程的环境/目录,并且 Pool
非常适合。我还会使用 python 的内置功能来创建临时目录:
import multiprocessing, os, shutil, tempfile
processes=16
def example_function(i):
with tempfile.TemporaryDirectory() as path:
os.chdir(path)
print(os.getcwd())
return i*i
if __name__ == '__main__':
#starting pool
pool=multiprocessing.Pool(processes)
result=pool.map(example_function,range(1000))
注意: tempfile.TemporaryDirectory
是在 python 3.2 中引入的。如果您使用的是旧版本的 python,您可以 copy the wrapper class into your code .
如果您真的需要预先设置目录...
尝试使用 Pool
进行这项工作有点麻烦。您可以传递要与数据一起使用的目录名称,但您只能传递等于目录数的初始数量。然后,您需要使用 imap_unordered
之类的东西来查看结果何时完成(并且它的目录可供重用)。
在我看来,更好的方法是根本不使用 Pool
,而是创建单独的 Process
对象并将每个对象分配给一个目录。如果您需要控制 Process
环境的某些部分,这通常会更好,而当您的问题是数据驱动且不关心时,Pool
通常会更好过程或其环境。
有多种方法可以将数据传入/传出 Process 对象,但最简单的方法是队列:
import multiprocessing,os,shutil
processes=16
in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()
def example_function(path, qin, qout):
os.chdir(path)
for i in iter(qin.get, 'stop'):
print(os.getcwd())
qout.put(i*i)
devshm='/dev/shm/'
# create processes & folders
procs = []
for i in range(processes):
path = devshm+str(i)+'/'
os.mkdir(path)
#shutil.copy(some_files,path)
procs.append(multiprocessing.Process(target=example_function, args=(path,in_queue, out_queue)))
procs[-1].start()
# send input
for i in range(1000):
in_queue.put(i)
# send stop signals
for i in range(processes):
in_queue.put('stop')
# collect output
results = []
for i in range(1000):
results.append(out_queue.get())
关于python - 多处理更改每个进程的当前文件夹,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31021163/