python - Pandas 和多处理内存管理 : Splitting a DataFrame into Multiple Chunks

标签 python pandas memory multiprocessing python-3.5

我必须逐行处理一个巨大的 pandas.DataFrame(几十 GB),其中每行操作都相当长(几十毫秒)。所以我有了将帧拆分为 block 并使用 multiprocessing 并行处理每个 block 的想法。这确实加快了任务的速度,但内存消耗是一场噩梦。

虽然每个子进程原则上应该只消耗一小块数据,但它需要(几乎)与包含原始 DataFrame 的原始父进程一样多的内存。即使删除父进程中使用过的部分也无济于事。

我写了一个最小的例子来复制这种行为。它所做的唯一一件事就是创建一个带有随机数的大型 DataFrame,将其分成最多 100 行的小块,并在多处理期间简单地打印一些关于 DataFrame 的信息(此处通过大小为 4 的 mp.Pool

并行执行的主函数:

def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))

DataFrame 分成小块的辅助生成器:

def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)

以及主要例程:

def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')

标准输出是这样的:

Starting MP
Preparing chunk 1
Preparing chunk 2
First idx 0, last idx 99 and len 100 from process 9913
First idx 100, last idx 199 and len 100 from process 9914
Preparing chunk 3
First idx 200, last idx 299 and len 100 from process 9915
Preparing chunk 4
...
DONE

问题:

主进程需要大约 120MB 的内存。但是,池的子进程需要相同数量的内存,尽管它们只包含原始 DataFame 的 1%(大小为 100 的 block 与原始长度为 10000 的 block ) .为什么?

我该怎么办?尽管我进行了分块,Python (3) 是否会将整个 DataFrame 发送到每个子进程?这是 pandas 内存管理的问题还是 multiprocessing 和数据 pickling 的错误?谢谢!



如果您想自己尝试,只需复制和粘贴整个脚本:

import multiprocessing as mp
import pandas as pd
import numpy as np
import time
import os


def just_wait_and_print_len_and_idx(df):
    """Waits for 5 seconds and prints df length and first and last index"""
    # Extract some info
    idx_values = df.index.values
    first_idx, last_idx = idx_values[0], idx_values[-1]
    length = len(df)
    pid = os.getpid()

    # Waste some CPU cycles
    time.sleep(1)

    # Print the info
    print('First idx {}, last idx {} and len {} '
          'from process {}'.format(first_idx, last_idx, length, pid))


def df_chunking(df, chunksize):
    """Splits df into chunks, drops data of original df inplace"""
    count = 0 # Counter for chunks
    while len(df):
        count += 1
        print('Preparing chunk {}'.format(count))
        # Return df chunk
        yield df.iloc[:chunksize].copy()
        # Delete data in place because it is no longer needed
        df.drop(df.index[:chunksize], inplace=True)


def main():
    # Job parameters
    n_jobs = 4  # Poolsize
    size = (10000, 1000)  # Size of DataFrame
    chunksize = 100  # Maximum size of Frame Chunk

    # Preparation
    df = pd.DataFrame(np.random.rand(*size))
    pool = mp.Pool(n_jobs)

    print('Starting MP')

    # Execute the wait and print function in parallel
    pool.imap(just_wait_and_print_len_and_idx, df_chunking(df, chunksize))

    pool.close()
    pool.join()

    print('DONE')


if __name__ == '__main__':
    main()

最佳答案

好的,所以我在 Sebastian Opałczyński 在评论中的提示后弄明白了。

问题在于子进程是从父进程派生出来的,因此它们都包含对原始 DataFrame 的引用。然而,框架是在原始进程中操作的,因此写时复制行为会缓慢地杀死整个过程,最终会在达到物理内存的限制时结束。

有一个简单的解决方案:我使用 multiprocessing 的新上下文功能代替 pool = mp.Pool(n_jobs):

ctx = mp.get_context('spawn')
pool = ctx.Pool(n_jobs)

这保证了 Pool 进程是刚刚生成的,而不是从父进程派生出来的。因此,它们都无法访问原始 DataFrame,并且它们都只需要父级内存的一小部分。

请注意,mp.get_context('spawn') 仅适用于 Python 3.4 及更新版本。

关于python - Pandas 和多处理内存管理 : Splitting a DataFrame into Multiple Chunks,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41240067/

相关文章:

python - 拉取 MS Access 表并将它们放入 python 中的数据框中

python - 如何处理 Pandas 中的 SettingWithCopyWarning

c++ - 为什么 memset 在二维数组中正确填充 0 但无法在该数组中填充 1?

Python 正则表达式意外行为

python - 恢复使用迭代器的 Tensorflow 模型

python - Pylint 警告只出现在 Jenkins 上

python - 当A上存在重复值时,如何找到A列上分组数据的最小值和另一列B上的最小值

java - 存储方法(堆栈/堆)中的局部最终变量在哪里?

java - 我的 Java 内存去哪儿了?

javascript - Django 运行 Python 脚本并将输出传递给 javascript 文件