Python 多处理性能

标签 python performance multiprocessing

这应该是我的第三个也是最后一个问题,关于我尝试提高我使用 Python 进行的一些统计分析的性能。我有 2 个版本的代码(单核与多处理),我期望通过使用多核来获得性能,因为我希望我的代码解压缩/解包相当多的二进制字符串,遗憾的是我注意到使用多核实际上会降低性能核心。

我想知道是否有人对我观察到的情况有可能的解释(向下滚动到 4 月 16 日的更新以获取更多信息)?

程序的关键部分是函数 numpy_array(+ 在多处理中解码),下面的代码片段(完整代码可通过 pastebin 访问,在下面进一步):

def numpy_array(data, peaks):
    rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

多处理版本使用一组函数执行此操作,我将在下面显示键 2:

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def numpy_array(shared_arr,peaks):
    processors=mp.cpu_count()
    with contextlib.closing(mp.Pool(processes=processors,
                                    initializer=pool_init,
                                    initargs=(shared_arr, ))) as pool:
        chunk_size=int(len(peaks)/processors)
        map_parameters=[]
        for i in range(processors):
            counter = i*chunk_size
            chunk=peaks[i*chunk_size:(i+1)*chunk_size]
            map_parameters.append((chunk, counter))
        pool.map(decode,map_parameters)

def decode ((chunk, counter)):
    data=tonumpyarray(shared_arr).view(
        [('f0','<f4'), ('f1','<f4',(250000,2))])
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            #with shared_arr.get_lock():
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

可以通过这些 pastebin 链接访问完整的程序代码

Pastebin for single core version

Pastebin for multiprocessing version

我在一个包含 239 个时间点和每个时间点约 180k 测量对的文件中观察到的性能是单核约 2.5m,多处理约 3.5。

PS:前两个问题(我第一次尝试并行化):

  1. Python multi-processing
  2. Making my NumPy array shared across processes

-- 4 月 16 日 --

我一直在使用 cProfile 库分析我的程序(在 __main__ 中有 cProfile.run('main()'),这表明有 1 个步骤这让一切都变慢了:

ncalls  tottime  percall  cumtime  percall filename:lineno(function)
23   85.859    3.733   85.859    3.733 {method 'acquire' of 'thread.lock' objects}

我在这里不明白的是 thread.lock 对象在 threading 中使用(据我所知)但不应该在多处理中使用,因为每个核心应该运行单个线程(除了拥有自己的锁定机制),那么这是怎么发生的,为什么单个调用需要 3.7 秒?

最佳答案

共享数据是一个已知的因同步而导致速度减慢的案例。

您能否在多个进程之间拆分您的数据,或者为每个进程提供一个独立的副本?然后,在所有计算完成之前,您的进程不需要同步任何东西。

然后我会让主进程将所有工作处理器的输出加入到一个连贯的集合中。

该方法可能需要额外的 RAM,但现在 RAM 很便宜。

如果您问,我也对每个线程锁获取 3700 毫秒感到困惑。 OTOH 分析可能会误认为像这样的特殊调用。

关于Python 多处理性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16019066/

相关文章:

python - 从多个 XBee 系列 2B 端点接收数据

ios - 如何加速或优化 iOS 的 SQLite 查询?

Python 多处理池没有创建足够的进程

c++ - OpenMP:循环遍历 'std::map' 基准(动态调度)

python - 以串行对象为参数的多进程

python - 当尝试运行 dag 和 Airflow 返回状态代码2时

python - 同时执行多个功能

swift - 如何按数量级估计我的示例代码的时间消耗

python - python中的序列匹配算法

php - 总是运行删除查询更有效,还是先检查该信息是否存在?