python - 多处理通过共享内存传递字典数组

标签 python arrays python-2.7 dictionary multiprocessing

以下代码可以运行,但由于传递的数据集很大,速度很慢。在实际实现中,创建进程和发送数据的速度几乎与计算时间相同,所以当创建第二个进程时,第一个进程几乎完成计算,进行并行化?毫无意义。

代码与本题Multiprocessing has cutoff at 992 integers being joined as result相同建议的更改在下面工作并实现。但是,我遇到了其他人认为的常见问题,需要很长时间来处理大量数据。

我看到了使用 multiprocessing.array 传递共享内存数组的答案。我有一个约 4000 个索引的数组,但每个索引都有一个包含 200 个键/值对的字典。每个进程只读取数据,完成一些计算,然后返回一个矩阵 (4000x3)(没有字典)。

这样的回答 Is shared readonly data copied to different processes for Python multiprocessing?使用 map 。是否可以维护以下系统并实现共享内存?有没有一种有效的方法可以使用字典数组将数据发送到每个进程,例如将字典包装在某个管理器中,然后将其放入 multiprocessing.array 中?

import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

已解决

只需将字典列表放入管理器,问题就解决了。

manager=Manager()
d=manager.list(myData)

似乎持有列表的管理器也管理该列表包含的字典。启动时间有点慢,所以看起来数据仍在复制,但它在开始时完成一次,然后在进程内部对数据进行切片。

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()

最佳答案

您可能会看到一些改进,方法是使用 multiprocessing.Manager 将您的列表存储在管理器服务器中,并让每个子进程通过从共享列表中拉出字典来访问项,而不是而不是将切片复制到每个子进程:

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

这会在创建任何 worker 之前将整个 data 列表复制到 Manager 进程。 Manager 返回一个 Proxy 对象,该对象允许共享访问 list。然后,您只需将 Proxy 传递给工作人员,这意味着他们的启动时间将大大减少,因为不再需要复制 data 列表的切片。这里的缺点是访问列表在 child 中会更慢,因为访问需要通过 IPC 转到管理器进程。这是否真的有助于提高性能在很大程度上取决于您在工作流程中对 list 所做的工作,但值得一试,因为它只需要很少的代码更改。

关于python - 多处理通过共享内存传递字典数组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25620211/

相关文章:

python - Pandas DataFrame 拼接结果成为系列

python-2.7 - Google Datastore 中的 get() 无法按预期工作

python - postgresql 中拼字游戏的最佳查询

python - Scikit-学习管道 : Size of predictions on test set is equal to size of training set

python - 没有 "verbose"参数的 sklearn 模型的进度条

php - 多值、多维数组作为单个数组

c++ - 在 C++ 中通过引用传递数组

java - 如何在二维中使用字符串数组(String[][])并解析每个空间(指定元素点)中的数据?

python - 在 Python 3.5 中使用 PyEval_EvalCode

Python Django : Enabling tags in models. 文本字段()