Python 多处理

标签 python multiprocessing

我有一个包含二进制编码字符串的大列表,我以前曾在单个函数中处理过这些字符串,如下所示:

""" just included this to demonstrate the 'data' structure """
data=np.zeros(250,dtype='float32, (250000,2)float32')

def func numpy_array(data, peaks):
rt_counter=0
    for x in peaks:
        if rt_counter %(len(peaks)/20) == 0:
            update_progress()
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[rt_counter][1][peak_counter][0]=float(buff2)
            else:
                data[rt_counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        rt_counter+=1

我一直在阅读有关多处理的内容,并认为我想尝试一下,看看是否可以大幅提高性能,我将我的函数重写为 2 个(帮助程序和“调用程序”),如下所示:

def numpy_array(data, peaks):
    processors=mp.cpu_count #Might as well throw this directly in the mp.Pool (just for clarity for now)
    pool = mp.Pool(processes=processors)
    chunk_size=len(peaks)/processors
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        pool.map(decode(data,chunk,counter))

def decode(data,chunk,counter):
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1      

该程序运行,但仅使用 100-110% 的 CPU(根据顶部),一旦完成,它就会抛出 TypeError: map() take at least 3 arguments (2给定)我,任何对多进程有更多经验的人都可以给我一个提示,告诉我要注意哪些事情(这可能会导致 TypeError)?是什么原因导致我的 CPU 使用率低?

--合并答案后的代码--

def decode((data,chunk,counter)):
    print len(chunk), counter
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        counter+=1

def numpy_array(data, peaks):
    """Fills the NumPy array 'data' with m/z-intensity values acquired
    from b64 decoding and unpacking the binary string read from the 
    mzXML file, which is stored in the list 'peaks'.

    The m/z values are assumed to be ordered without validating this
    assumption.

    Note: This function uses multi-processing
    """
    processors=mp.cpu_count()
    pool = mp.Pool(processes=processors)
    chunk_size=int(len(peaks)/processors)
    map_parameters=[]
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter))
    pool.map(decode,map_parameters) 

到目前为止,这个最新版本“有效”,它填充了进程中的数组(其中数组包含值),但一旦所有进程都完成访问数组,只会产生零值,因为每个进程都会获取数组的本地副本。

最佳答案

这样的东西应该有效

请注意,pool.map 每次调用都会采用一个函数和该函数的参数列表。在您的原始示例中,您只是在 numpy_array 函数中调用它。

该函数必须只有一个参数,因此将参数打包到一个元组中,并在 decode 中使用看起来相当奇怪的双括号(这称为元组解包)。

def numpy_array(data, peaks):
    processors=4
    pool = mp.Pool(processes=processors)
    chunk_size=len(data)/processors
    print range(processors)
    map_parameters = [] # new
    for i in range(processors):
        counter = i*chunk_size
        chunk=peaks[i*chunk_size:(i+1)*chunk_size-1]
        map_parameters.append((data,chunk,counter)) # new
    pool.map(decode, map_parameters) # new

def decode((data,chunk,counter)): # changed
    for x in chunk:
        peak_counter=0
        data_buff=base64.b64decode(x)
        buff_size=len(data_buff)/4
        unpack_format=">%dL" % buff_size
        index=0
        for y in struct.unpack(unpack_format,data_buff):
            buff1=struct.pack("I",y)
            buff2=struct.unpack("f",buff1)[0]
            if (index % 2 == 0):
                data[counter][1][peak_counter][0]=float(buff2)
            else:
                data[counter][1][peak_counter][1]=float(buff2)
                peak_counter+=1
            index+=1
        print data[counter][1][10][0]
        counter+=1

关于Python 多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15966157/

相关文章:

php - 通过 FTP 将文件复制到服务器后的文件大小差异

python - numpy向量运算的核心使用上限

python - 避免在 multiprocessing.Pool worker 中使用全局变量来获取不可篡改的共享状态

python - celery 抛出长错误信息

python - py.test -n <进程数> => "py.test: error: unrecognized arguments: -n"

Python 多处理安装 : Command "python setup.py egg_info" failed with error code 1

python-3.x - 我应该如何设置 spaCy 服务器来处理多个并发请求(非阻塞)?

python - multiprocessing.Pool 的 apply_async 中的工作人员是否有办法捕获错误并继续?

python - 正则表达式实现细节

python - 如何在两个数据类之间绘制分隔线?