python - 多处理更改每个进程的当前文件夹

标签 python multithreading multiprocessing

我正在尝试使用 multiprocessing 模块并行化一些计算。

如何确定 multiprocessing.Pool.map_async 生成的每个进程都在不同的(先前创建的)文件夹上运行?

问题是每个进程都会调用一些将临时文件写入磁盘的第三方库,如果您在同一个文件夹中运行许多库,就会把一个与另一个搞混。

此外,我无法为 map_async 进行的每个函数调用都创建一个新文件夹,而是我想创建尽可能少的文件夹(即每个进程一个)。

代码类似于:

import multiprocessing,os,shutil
processes=16

#starting pool
pool=multiprocessing.Pool(processes)

#The asked dark-magic here?

devshm='/dev/shm/'
#Creating as many folders as necessary
for p in range(16):
    os.mkdir(devshm+str(p)+'/')
    shutil.copy(some_files,p)

def example_function(i):
    print os.getcwd()
    return i*i
result=pool.map_async(example_function,range(1000))

这样在任何时候,example_function 的每次调用都在不同的文件夹上执行。

我知道一个解决方案可能是使用子进程来产生不同的进程,但我想坚持使用多处理(我需要为每个产生的子进程腌制一些对象,写入磁盘,读取,取消腌制,而不是通过函数调用传递对象本身(使用 functools.partial)。

附言。

This question在某种程度上是相似的,但该解决方案并不能保证每个函数调用都发生在不同的文件夹中,这确实是我的目标。

最佳答案

由于您没有在问题中指定,我假设您在函数执行完成后不需要目录的内容。

绝对最简单的方法是在使用它们的函数中创建和销毁临时目录。这样,您的其余代码就不会关心工作进程的环境/目录,并且 Pool 非常适合。我还会使用 python 的内置功能来创建临时目录:

import multiprocessing, os, shutil, tempfile
processes=16

def example_function(i):
    with tempfile.TemporaryDirectory() as path:
        os.chdir(path)
        print(os.getcwd())
        return i*i

if __name__ == '__main__':
    #starting pool
    pool=multiprocessing.Pool(processes)

    result=pool.map(example_function,range(1000))

注意: tempfile.TemporaryDirectory是在 python 3.2 中引入的。如果您使用的是旧版本的 python,您可以 copy the wrapper class into your code .

如果您真的需要预先设置目录...

尝试使用 Pool 进行这项工作有点麻烦。您可以传递要与数据一起使用的目录名称,但您只能传递等于目录数的初始数量。然后,您需要使用 imap_unordered 之类的东西来查看结果何时完成(并且它的目录可供重用)。

在我看来,更好的方法是根本不使用 Pool,而是创建单独的 Process 对象并将每个对象分配给一个目录。如果您需要控制 Process 环境的某些部分,这通常会更好,而当您的问题是数据驱动且不关心时,Pool 通常会更好过程或其环境。

有多种方法可以将数据传入/传出 Process 对象,但最简单的方法是队列:

import multiprocessing,os,shutil
processes=16

in_queue = multiprocessing.Queue()
out_queue = multiprocessing.Queue()

def example_function(path, qin, qout):
    os.chdir(path)

    for i in iter(qin.get, 'stop'):
        print(os.getcwd())
        qout.put(i*i)


devshm='/dev/shm/'
# create processes & folders
procs = []
for i in range(processes):
    path = devshm+str(i)+'/'
    os.mkdir(path)
    #shutil.copy(some_files,path)

    procs.append(multiprocessing.Process(target=example_function, args=(path,in_queue, out_queue)))
    procs[-1].start()


# send input
for i in range(1000):
    in_queue.put(i)
# send stop signals
for i in range(processes):
    in_queue.put('stop')

# collect output    
results = []
for i in range(1000):
    results.append(out_queue.get())

关于python - 多处理更改每个进程的当前文件夹,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31021163/

相关文章:

Python将字典传递给多处理中的进程

多处理可能导致 Python defaultdict 行为?

python - 在多处理期间保持统一计数?

python - 美汤加工

python - 对 Django 模型请求的数据进行简单操作

c++ - 线程同步

java - 任务管理器及其与 Java 进程的相关性

Python:将对象存储在二维数组中并调用其方法

python - 如何将groupby中的行与多个条件组合起来?

c# - 线程同步 : shared resources and actions with different resource number demand