python - 为什么 multiprocessing.Pool 和 multiprocessing.Process 在 Linux 中的表现如此不同

标签 python linux multiprocessing

我运行了一些测试代码来检查在 Linux 中使用 Pool 和 Process 的性能。我正在使用 Python 2.7。 multiprocessing.Pool 的源代码似乎显示它正在使用 multiprocessing.Process。然而,multiprocessing.Pool 比 multiprocessing.Process 的相同数量花费更多的时间和内存,我不明白这一点。

这是我做的:

  1. 创建一个大字典,然后进行子处理。

  2. 将字典传递给每个子进程以实现只读。

  3. 每个子进程都进行一些计算并返回一个小结果。

下面是我的测试代码:

from multiprocessing import Pool, Process, Queue
import time, psutil, os, gc

gct = time.time
costTime = lambda ET: time.strftime('%H:%M:%S', time.gmtime(int(ET)))

def getMemConsumption():
    procId = os.getpid()
    proc = psutil.Process(procId)
    mem = proc.memory_info().rss
    return "process ID %d.\nMemory usage: %.6f GB" % (procId, mem*1.0/1024**3)

def f_pool(l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        # gc.collect()
        print getMemConsumption()
        return 1, result, jobID
    except:
        return 0, {}, jobID

def f_proc(q, l, n, jobID):
    try:
        result = {}
        # example of subprocess work
        for i in xrange(n):
            result[i] = l[i]
        # work done
        print getMemConsumption()
        q.put([1, result, jobID])
    except:
        q.put([0, {}, jobID])

def initialSubProc(targetFunc, procArgs, jobID):
    outQueue = Queue()
    args = [outQueue]
    args.extend(procArgs)
    args.append(jobID)
    p = Process(target = targetFunc, args = tuple(args))
    p.start()
    return p, outQueue


def track_add_Proc(procList, outQueueList, maxProcN, jobCount, 
                   maxJobs, targetFunc, procArgs, joinFlag, all_result):
    if len(procList) < maxProcN:
        p, q = initialSubProc(targetFunc, procArgs, jobCount)
        outQueueList.append(q)
        procList.append(p)
        jobCount += 1
        joinFlag.append(0)
    else:
        for i in xrange(len(procList)):
            if not procList[i].is_alive() and joinFlag[i] == 0:
                procList[i].join()
                all_results.append(outQueueList[i].get())
                joinFlag[i] = 1 # in case of duplicating result of joined subprocess
                if jobCount < maxJobs:
                    p, q = initialSubProc(targetFunc, procArgs, jobCount)
                    procList[i] = p
                    outQueueList[i] = q
                    jobCount += 1
                    joinFlag[i] = 0
    return jobCount

if __name__ == '__main__':
    st = gct()
    d = {i:i**2 for i in xrange(10000000)}
    print "MainProcess create data dict\n%s" % getMemConsumption()
    print 'Time to create dict: %s\n\n' % costTime(gct()-st)

    nproc = 2
    jobs = 8
    subProcReturnDictLen = 1000
    procArgs = [d, subProcReturnDictLen]

    print "Use multiprocessing.Pool, max subprocess = %d, jobs = %d\n" % (nproc, jobs)
    st = gct()
    pool = Pool(processes = nproc)
    for i in xrange(jobs):
        procArgs.append(i)
        sp = pool.apply_async(f_pool, tuple(procArgs))
        procArgs.pop(2)
        res = sp.get()
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    pool.close()
    pool.join()
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Process\n", getMemConsumption(), '\n'

    print "Use multiprocessing.Process, max subprocess = %d, jobs = %d\n" % (nproc, jobs)
    st = gct()
    procList = []
    outQueueList = []
    all_results = []
    jobCount = 0
    joinFlag = []
    while (jobCount < jobs):
        jobCount = track_add_Proc(procList, outQueueList, nproc, jobCount, 
                                  jobs, f_proc, procArgs, joinFlag, all_results)
    for i in xrange(nproc):
        if joinFlag[i] == 0:
            procList[i].join()
            all_results.append(outQueueList[i].get())
            joinFlag[i] = 1
    for i in xrange(jobs):
        res = all_results[i]
        if res[0] == 1:
            # do something with the result
            pass
        else:
            # do something with subprocess exception handle
            pass
    print "Total time used to finish all jobs: %s" % costTime(gct()-st)
    print "Main Process\n", getMemConsumption()

结果如下:

MainProcess create data dict
process ID 21256.
Memory usage: 0.841743 GB
Time to create dict: 00:00:02


Use multiprocessing.Pool, max subprocess = 2, jobs = 8

process ID 21266.
Memory usage: 1.673084 GB
process ID 21267.
Memory usage: 1.673088 GB
process ID 21266.
Memory usage: 2.131172 GB
process ID 21267.
Memory usage: 2.131172 GB
process ID 21266.
Memory usage: 2.176079 GB
process ID 21267.
Memory usage: 2.176083 GB
process ID 21266.
Memory usage: 2.176079 GB
process ID 21267.
Memory usage: 2.176083 GB

Total time used to finish all jobs: 00:00:49
Main Process
process ID 21256.
Memory usage: 0.843079 GB 


Use multiprocessing.Process, max subprocess = 2, jobs = 8

process ID 23405.
Memory usage: 0.840614 GB
process ID 23408.
Memory usage: 0.840618 GB
process ID 23410.
Memory usage: 0.840706 GB
process ID 23412.
Memory usage: 0.840805 GB
process ID 23415.
Memory usage: 0.840900 GB
process ID 23417.
Memory usage: 0.840973 GB
process ID 23419.
Memory usage: 0.841061 GB
process ID 23421.
Memory usage: 0.841152 GB

Total time used to finish all jobs: 00:00:00
Main Process
process ID 21256.
Memory usage: 0.843781 GB

我不知道为什么 multiprocessing.Pool 的子进程一开始需要大约 1.6GB,但是 multiprocessing.Process 的子进程只需要 0.84 GB,这等于主进程的内存成本。在我看来,只有 multiprocessing.Process 享有 linux 的“写时复制”优势,因为所有作业所需的时间都不到 1 秒。我不知道为什么 multiprocessing.Pool 不喜欢这个。从源代码来看,multiprocessing.Pool 似乎是 multiprocessing.Process 的包装器。

最佳答案

Question: I don't know why subprocesses from multiprocessing.Pool need about 1.6GB in the beginning,
... Pool seems like a wrapper of multiprocessing.Process

这是因为 Pool 为所有作业的结果保留内存。
其次,Pool 使用两个 SimpleQueue()三个 Threads
第三,Pool 在传递给 process 之前复制所有传递的 argv 数据。

您的process 示例仅对all 使用one Queue(),传递argv原样。

Pool 远只是一个包装器。

关于python - 为什么 multiprocessing.Pool 和 multiprocessing.Process 在 Linux 中的表现如此不同,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44207826/

相关文章:

python - 如何在多线程中使用python多处理代理对象

matplotlib - 如何解决 python 多处理 matplotlib savefig() 问题?

Python:多处理无法完成作业

python - 从linux命令输出中选择文件名

python - 如何只打印字符串中的特定单词

c++ - 如何在 C++ 中的单独 posix 线程中运行 OpenCV Videocapture 以实现跨平台?

c - 为什么 makecontext 的函数只接受整数参数

c - 用于 i2c 设备的 linux 驱动程序——读取两个字节

Python:使用列表索引 [from:to] 任意 numpy 数组

python - 使用 -lwiringPy 在 C 中编译 Python 模块