python - 如何使用所有CPU对大文件列表进行子处理?

标签 python multiprocessing subprocess pool python-3.7

我需要在命令行中使用LaTeXML库将86,000 TEX文件转换为XML。我试图编写一个Python脚本,以利用所有4个内核通过subprocess模块自动执行此操作。

def get_outpath(tex_path):
    path_parts = pathlib.Path(tex_path).parts
    arxiv_id = path_parts[2]
    outpath = 'xml/' + arxiv_id + '.xml'
    return outpath

def convert_to_xml(inpath):
    outpath = get_outpath(inpath)

    if os.path.isfile(outpath):
        message = '{}: Already converted.'.format(inpath)
        print(message)
        return

    try:
        process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                                   stderr=subprocess.PIPE, 
                                   stdout=subprocess.PIPE)
    except Exception as error:
        process.kill()
        message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
        print(message)

    message = '{}: Converted!'.format(inpath)
    print(message)

def start():
    start_time = time.time()
    pool = multiprocessing.Pool(processes=multiprocessing.cpu_count(),
                               maxtasksperchild=1)
    print('Initialized {} threads'.format(multiprocessing.cpu_count()))
    print('Beginning conversion...')
    for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
        pass
    pool.close()
    pool.join()
    print("TIME: {}".format(total_time))

start()


该脚本导致Too many open files并降低计算机速度。从活动监视器看,该脚本似乎试图一次创建86,000个转换子进程,并且每个进程都试图打开文件。也许这是pool.imap_unordered(convert_to_xml, preprints)的结果-也许我不需要将map与subprocess.Popen一起使用,因为我要调用的命令太多了?有什么选择?

我花了整整一天的时间试图找出正确的方法来进行批量子处理。我是Python的新手,所以向正确方向前进的任何提示将不胜感激。谢谢!

最佳答案

convert_to_xml中,process = subprocess.Popen(...)语句产生latexml子进程。
没有process.communicate()之类的阻塞调用,即使convert_to_xml继续在后台运行,latexml也会结束。

由于convert_to_xml结束,因此Pool向关联的工作进程发送了另一个要运行的任务,因此再次调用convert_to_xml
再次在后台生成另一个latexml进程。
很快,您就可以在latexml进程中全神贯注,并且已达到打开文件数量的资源限制。

修复很容易:添加process.communicate()告诉convert_to_xml等待直到latexml进程完成。

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)




关于if __name__ == '__main__'

作为martineau pointed out,有一个warning in the multiprocessing docs
不应在模块的顶层调用产生新进程的代码。
相反,该代码应包含在if __name__ == '__main__'语句内。

在Linux中,如果忽略此警告,则不会发生任何可怕的事情。
但是在Windows中,代码为“ fork-bombs”。或更准确地说,代码
导致生成未缓解的子流程链,因为在Windows上,fork是通过生成新的Python流程来模拟的,然后导入调用脚本。每次导入都会产生一个新的Python进程。每个Python进程都会尝试导入调用脚本。直到消耗完所有资源后,该周期才会中断。

因此,对我们的Windows-fork-bereft弟兄们很好,请使用

if __name__ == '__main__:
    start()




有时进程需要大量内存。 The only reliable way释放内存是终止进程。 maxtasksperchild=1告诉pool在完成1个任务后终止每个工作进程。然后,它产生一个新的工作进程来处理另一个任务(如果有的话)。这将释放原始工作人员可能已经分配的(内存)资源,而这些资源否则将无法释放。

在您的情况下,工作进程似乎不需要大量内存,因此您可能不需要maxtasksperchild=1
convert_to_xml中,process = subprocess.Popen(...)语句产生latexml子进程。
没有process.communicate()之类的阻塞调用,即使convert_to_xml继续在后台运行,latexml也会结束。

由于convert_to_xml结束,因此Pool向关联的工作进程发送了另一个要运行的任务,因此再次调用convert_to_xml
再次在后台生成另一个latexml进程。
很快,您就可以在latexml进程中全神贯注,并且已达到打开文件数量的资源限制。

修复很容易:添加process.communicate()告诉convert_to_xml等待直到latexml进程完成。

try:
    process = subprocess.Popen(['latexml', '--dest=' + outpath, inpath], 
                               stderr=subprocess.PIPE, 
                               stdout=subprocess.PIPE)
    process.communicate()                                   
except Exception as error:
    process.kill()
    message = "error: %s run(*%r, **%r)" % (e, args, kwargs)
    print(message)

else: # use else so that this won't run if there is an Exception
    message = '{}: Converted!'.format(inpath)
    print(message)




chunksize影响工作人员在将结果发送回主流程之前执行的任务数量。
Sometimes这可能会影响性能,尤其是在进程间通信是整个运行时的重要部分的情况下。

在您的情况下,convert_to_xml花费相对较长的时间(假设我们等到latexml完成),并且它仅返回None。因此,进程间通信可能不是整个运行时的重要部分。因此,我不希望在这种情况下您会发现性能发生重大变化(尽管对实验没有影响!)。



在普通的Python中,不应仅使用map多次调用一个函数。

出于类似的风格原因,在关心返回值的情况下,我会保留使用pool.*map*方法。

所以代替

for _ in pool.imap_unordered(convert_to_xml, preprints, chunksize=5): 
    pass


您可以考虑使用

for preprint in preprints:
    pool.apply_async(convert_to_xml, args=(preprint, ))


代替。



传递给任何pool.*map*函数的iterable被消耗
立即。迭代器是否为迭代器并不重要。没有
使用此处的迭代器具有特殊的内存优势。 imap_unordered返回一个
迭代器,但是它不会以任何特别对迭代器友好的方式处理其输入
道路。

无论您传递哪种类型的可迭代对象,在调用pool.*map*函数时,可迭代对象都是
消耗并转化为任务,然后将其放入任务队列。

这是证实这一主张的代码:

version1.py:

import multiprocessing as mp
import time

def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()
    for item in pool.imap_unordered(foo, gen()):
        pass

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()


version2.py:

import multiprocessing as mp
import time
def foo(x):
    time.sleep(0.1)
    return x * x


def gen():
    for x in range(1000):
        if x % 100 == 0:
            print('Got here')
        yield x


def start():
    pool = mp.Pool()

    for item in gen():
        result = pool.apply_async(foo, args=(item, ))

    pool.close()
    pool.join()

if __name__ == '__main__':
    start()


运行version1.pyversion2.py都会产生相同的结果。

Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here
Got here


至关重要的是,您会发现Got here在以下位置快速打印了10次
运行的开始,然后有很长的暂停(在计算时
完成)。

如果生成器gen()pool.imap_unordered缓慢消耗,
我们应该期望Got here也会缓慢打印。由于Got here
快速打印了10次,我们可以看到正在迭代的gen()
在任务完成之前就完全消耗完了。

运行这些程序有望使您充满信心
pool.imap_unorderedpool.apply_async将任务放入队列
本质上以相同的方式:在调用后立即进行。

关于python - 如何使用所有CPU对大文件列表进行子处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54728128/

相关文章:

python - 如何从 pandas DF 获取 `field_name:field_type` 的字典

python - PermissionError : [WinError 5] Access denied

python - 如何从 subprocess.Popen 使用 STDIN

python - 使用 NumPy 快速进行 token 到索引的转换

python - 修改一列的值在原始值后添加一个词

python - Python 中的并发,多进程比单进程慢

linux-kernel - 保护多处理器中的共享内存区域

mysql - Perl、子项和共享数据

python - 使用 mpi4py 在脚本中调用子进程

java - java中相当于python中的time.clock的是什么?