Python 在并行处理大文件时不释放 RAM

标签 python python-3.x memory-management parallel-processing

我有一个 25Gb 的纯文本文件,大约有 1000 万行,每行几百个单词。每行都需要单独处理,我试图将 block 拆分为十几个工作人员以并行处理。目前一次加载一百万行(由于某种原因,这占用了 ~10Gb 内存,即使它在磁盘上只有 ~3Gb 未压缩,)将它平均分成 12 种方式,然后使用 multiprocessing.Pool 将它映射到 12 个工作人员。

问题是,当我的 12 个工作人员中的每一个完成处理分配给他们的数据时,他们的 RAM 没有被释放,只会在下一个百万行迭代中再增加 ~10Gb。

我试过“删除”以前的数据,将以前的数据重置为空分配,删除后使用 eval()、gc.collect() 创建可迭代变量名,并将 IO 完全分离到它自己的功能,都没有运气和完全相同的问题。运行调试显示 python 解释器仅识别预期的数据,并且无法访问上一次迭代的数据,那么为什么 RAM 实际上没有被释放?

下面的代码是我尝试分离所有环境的最新迭代,不是最有效的,但“BigFileOnDisk”在 SSD 上,因此与实际处理数据相比,每次重新读取文件可以忽略不计。以前在分配函数中有“读取”功能,在工作人员完成后删除所有数据,结果相同。

def allocation():
    fileCompleted = False
    currentLine = 0
    while not fileCompleted:
        lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
        list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
    currentLine = 0
    lines = []
    with open(BigFileOnDisk, 'r') as fid:
        for line in fid:
            if currentLine >= startLine:
                lines.append(line)
            if currentLine - startLine >= numLines:
                return lines, counter, False
            currentLine += 1
        # or if we've hit the end of the file
        return lines, counter, True


def worker(lines):
    outputPath = *root* + str(datetime.datetime.now().time())
    processedData = {}

    for line in lines:
        # process data

    del lines
    with open(outputPath, 'a') as fid:
        for item in processedData:
            fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    p = Pool(workers)
    p.map(function_object, inputs_split)

最佳答案

您没有加入子进程。在 list_of_values 完成后,由 Pool 创建的进程仍然存在(有点像僵尸,但父进程仍然存在)。他们仍然持有他们所有的值(value)观。你在 main 中看不到他们的数据,因为它在另一个进程中(出于同样的原因 gc.collect 不工作)。

要释放工作人员分配的内存,您需要手动加入 Pool 或使用 with

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)

关于Python 在并行处理大文件时不释放 RAM,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38470123/

相关文章:

python - 在 Django 数据库中查找电话号码后添加其他字段

python - Tensorflow如何检查张量行是否只有零?

python - 如何使用 while + for 循环计算 Big O

python-3.x - 基于 panda 数据框创建嵌套字典

c++ - 嵌套容器的自定义分配器

c - 为什么在为存储要在程序中使用的 main 参数的指针变量赋值之前不分配空间?

iphone - iOS 中的内存保留和泄漏

python - 使用 Tensorflow 计算 MNIST 图像的熵

python - Flask-SQLAlchemy:照片列类型

php - 您如何使用脚本语言(PHP、Python 等)来提高您的生产力?