python - 是否有避免内存深拷贝或减少多处理时间的好方法?

标签 python multithreading pandas multiprocessing bigdata

我正在使用Python环境的Pandas模块制作基于内存的“大数据”实时计算模块。

所以响应时间是这个模块的质量,非常关键和重要。

为了处理大型数据集,我拆分数据并并行处理子拆分数据。

在存储子数据结果的部分,花费了很多时间(第21行)。

我认为内部内存深拷贝出现或者传递的子数据没有在内存中共享。

如果我用 C 或 C++ 编写模块,我将使用如下指针或引用。

"process=Process(target=addNewDerivedColumn, args=[resultList, &sub_dataframe])"

"process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])

def addNewDerivedColumn(resultList, split_sub_dataframe&):.... "

是否有避免内存深拷贝或减少多处理时间的好方法? “不优雅”很好。 我准备好让我的代码变脏了。 我尝试了 weekref、RawValue、RawArray、Value、Pool,但都失败了。

该模块正在 MacOS 中开发,最终将在 Linux 或 Unix 中运行。

不考虑 Windows 操作系统。

代码来了。

真正的代码在我的办公室,但结构和逻辑与真正的相同。

1 #-*- coding: UTF-8 -*-' 
2 import pandas as pd
3 import numpy as np
4 from multiprocessing import *
5 import time
6
7
8 def addNewDerivedColumn(resultList, split_sub_dataframe):
9    
10    split_sub_dataframe['new_column']=    np.abs(split_sub_dataframe['column_01']+split_sub_dataframe['column_01']) / 2
11    
12    print split_sub_dataframe.head()
13    
14    '''
15     i think that the hole result of sub-dataframe is copied to resultList, not reference value 
16     and in here time spend much
17     compare elapsed time of comment 21th line with the uncommented one
18     In MS Windows, signifiant difference of elapsed time doesn't show up
19     In Linux or Mac OS, the difference is big
20    '''
21    resultList.append(split_sub_dataframe)
22    
23
24
25 if __name__ == "__main__":
26    
27    # example data generation
28    # the record count of the real data is over 1 billion with about 10 columns.
29    dataframe = pd.DataFrame(np.random.randn(100000000, 4), columns=['column_01', 'column_02', 'column_03', 'column_04'])
30    
31
32    print 'start...'
33    start_time = time.time()
34    
35    # to launch 5 process in parallel, I split the dataframe to five sub-dataframes
36    split_dataframe_list = np.array_split(dataframe, 5)
37    
38    # multiprocessing 
39    manager = Manager()
40    
41    # result list
42    resultList=manager.list()
43    processList=[]
44    
45    for sub_dataframe in split_dataframe_list:
46        process=Process(target=addNewDerivedColumn, args=[resultList, sub_dataframe])
47        processList.append(process)
48        
49    for proc in processList: 
50        proc.start()
51    for proc in processList: 
52        proc.join()
53    
54    
55    print 'elapsed time  : ', np.round(time.time() - start_time,3)

最佳答案

如果将进程间通信保持在一个 最低限度。因此,不是将子数据帧作为参数传递,而是传递 指标值。子进程可以对公共(public) DataFrame 本身进行切片。

当一个子进程被派生时,它会得到一份定义在 父进程的调用模块。因此,如果大型 DataFrame df生成多处理池之前在全局变量中定义,然后每个 生成的子进程将有权访问 df

在 Windows 上,没有 fork() 的地方,一个新的 python 进程被启动并且 导入调用模块。因此,在 Windows 上,生成的子进程必须 从头开始重新生成 df,这可能需要时间和更多内存。

然而,在 Linux 上,您有写时复制。这意味着生成的 subprocess 访问原始全局变量(调用模块的)而不 复制它们。只有当子进程尝试修改全局时,Linux 才会 然后在修改值之前制作一个单独的副本。

所以如果你避免修改你的全局变量,你可以享受性能提升 子流程。我建议仅将子进程用于计算。返回 计算值,让主进程整理结果修改 原始数据框。

import pandas as pd
import numpy as np
import multiprocessing as mp
import time

def compute(start, end):
    sub = df.iloc[start:end]
    return start, end, np.abs(sub['column_01']+sub['column_01']) / 2

def collate(retval):
    start, end, arr = retval
    df.ix[start:end, 'new_column'] = arr

def window(seq, n=2):
    """
    Returns a sliding window (of width n) over data from the sequence
    s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ...
    """
    for i in range(len(seq)-n+1):
        yield tuple(seq[i:i+n])

if __name__ == "__main__":
    result = []
    # the record count of the real data is over 1 billion with about 10 columns.
    N = 10**3
    df = pd.DataFrame(np.random.randn(N, 4),
                      columns=['column_01', 'column_02', 'column_03', 'column_04'])

    pool = mp.Pool()    
    df['new_column'] = np.empty(N, dtype='float')

    start_time = time.time()
    idx = np.linspace(0, N, 5+1).astype('int')
    for start, end in window(idx, 2):
        # print(start, end)
        pool.apply_async(compute, args=[start, end], callback=collate)

    pool.close()
    pool.join()
    print 'elapsed time  : ', np.round(time.time() - start_time,3)
    print(df.head())

关于python - 是否有避免内存深拷贝或减少多处理时间的好方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19615560/

相关文章:

python - 无法在 ipython 笔记本上显示图表

c# - AutoResetEvent.WaitOne 超时与 Thread.Sleep

python - pandas .diff() 但使用第一个单元格作为前一列中最后一个单元格之间的差异

python - 给定条件的矩阵上的 Numpy 高级索引

PYTHON如何将字符串转换为int或float

java - MotionEvent.obtain(…);不像屏幕上的攻丝

ios - 同时2个定时器

python - 在 python pandas 中获取单元格值并填充新列的行

python - 评估 Pandas DataFrame 中 if-then-else block 中的多个条件

python - selenium 不适用于 Firefox 或 Chrome