python - multiprocessing.Pool.imap_unordered 的内存使用量稳步增长

标签 python multithreading multiprocessing

我刚刚注意到我的程序在处理一个大文件时使用了越来越多的内存。不过,它一次只处理一行,所以我不明白为什么它会继续使用更多内存。

经过大量挖掘,我意识到该程序分为三个部分:

  1. 加载数据,一次一行。
  2. 使用 imap_unordered() 处理 multiprocessing.Pool 中的每一行。
  3. 在单个线程中处理每一行。

如果第 1 步和第 2 步比第 3 步快,那么池 worker 的结果将排队,消耗内存。

我如何限制在第 2 步中输入池中的数据,使其不会在第 3 步中领先于消费者?

这看起来类似于 another multiprocessing question ,但我不清楚这个问题的延迟在哪里。

这是一个演示问题的小例子:

import logging
import os
import multiprocessing
from time import sleep

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()

def process_step1():
    data = 'a' * 100000
    for i in xrange(10000):
        sleep(.001)  # Faster than step 3.
        yield data
        if i % 1000 == 0:
            logger.info('Producing %d.', i)
    logger.info('Finished producing.')


def process_step2(data):
    return data.upper()


def process_step3(up_data):
    assert up_data == 'A' * 100000
    sleep(.005)  # Slower than step 1.


def main():
    pool = multiprocessing.Pool(processes=10)
    logger.info('Starting.')
    loader = process_step1()
    processed = pool.imap_unordered(process_step2, loader)
    for i, up_data in enumerate(processed):
        process_step3(up_data)
        if i % 500 == 0:
            logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
    logger.info('Done.')


def get_memory():
    """ Look up the memory usage, return in MB. """
    proc_file = '/proc/{}/status'.format(os.getpid())
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
    with open(proc_file, 'rU') as f:
        for line in f:
            if 'VmSize:' in line:
                fields = line.split()
                size = int(fields[1])
                scale = fields[2].upper()
                return size*scales[scale]/scales['MB']
    return 0.0  # Unknown

main()

运行时,我看到内存使用量稳步增加,直到第 1 步完成。如果在那之后我让它运行足够长的时间,内存使用将开始减少。

2016-12-01 15:37:50,859:6414:139712380557056:Starting.
2016-12-01 15:37:50,861:6414:139712266237696:Producing 0.
2016-12-01 15:37:50,868:6414:139712380557056:Consuming 0, using 255.0 MB.
2016-12-01 15:37:52,054:6414:139712266237696:Producing 1000.
2016-12-01 15:37:53,244:6414:139712266237696:Producing 2000.
2016-12-01 15:37:53,421:6414:139712380557056:Consuming 500, using 383.0 MB.
2016-12-01 15:37:54,446:6414:139712266237696:Producing 3000.
2016-12-01 15:37:55,635:6414:139712266237696:Producing 4000.
2016-12-01 15:37:55,976:6414:139712380557056:Consuming 1000, using 511.2 MB.
2016-12-01 15:37:56,831:6414:139712266237696:Producing 5000.
2016-12-01 15:37:58,019:6414:139712266237696:Producing 6000.
2016-12-01 15:37:58,529:6414:139712380557056:Consuming 1500, using 703.2 MB.
2016-12-01 15:37:59,209:6414:139712266237696:Producing 7000.
2016-12-01 15:38:00,406:6414:139712266237696:Producing 8000.
2016-12-01 15:38:01,084:6414:139712380557056:Consuming 2000, using 831.5 MB.
2016-12-01 15:38:01,602:6414:139712266237696:Producing 9000.
2016-12-01 15:38:02,802:6414:139712266237696:Finished producing.
2016-12-01 15:38:03,640:6414:139712380557056:Consuming 2500, using 959.5 MB.
2016-12-01 15:38:06,199:6414:139712380557056:Consuming 3000, using 959.5 MB.

最佳答案

似乎 Pool.imap_unordered() 启动了一个新线程来迭代步骤 1 生成的输入序列,因此我们需要从运行步骤 3 的主线程中限制该线程。 Semaphore class旨在限制一个线程与另一个线程的关系,因此我们在生成每一行之前调用 acquire(),在消费每一行时调用 release()。如果我们以某个任意值(例如 100)启动信号量,那么它会在阻塞并等待消费者 catch 之前产生 100 行的缓冲区。

import logging
import os
import multiprocessing
from threading import Semaphore
from time import sleep

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s:%(process)d:%(thread)d:%(message)s')
logger = logging.getLogger()

def process_step1(semaphore):
    data = 'a' * 100000
    for i in xrange(10000):
        semaphore.acquire()
        sleep(.001)  # Faster than step 3.
        yield data
        if i % 1000 == 0:
            logger.info('Producing %d.', i)
    logger.info('Finished producing.')


def process_step2(data):
    return data.upper()


def process_step3(up_data, semaphore):
    assert up_data == 'A' * 100000
    sleep(.005)  # Slower than step 1.
    semaphore.release()


def main():
    pool = multiprocessing.Pool(processes=10)
    semaphore = Semaphore(100)
    logger.info('Starting.')
    loader = process_step1(semaphore)
    processed = pool.imap_unordered(process_step2, loader)
    for i, up_data in enumerate(processed):
        process_step3(up_data, semaphore)
        if i % 500 == 0:
            logger.info('Consuming %d, using %0.1f MB.', i, get_memory())
    logger.info('Done.')


def get_memory():
    """ Look up the memory usage, return in MB. """
    proc_file = '/proc/{}/status'.format(os.getpid())
    scales = {'KB': 1024.0, 'MB': 1024.0 * 1024.0}
    with open(proc_file, 'rU') as f:
        for line in f:
            if 'VmSize:' in line:
                fields = line.split()
                size = int(fields[1])
                scale = fields[2].upper()
                return size*scales[scale]/scales['MB']
    return 0.0  # Unknown

main()

现在内存使用量很稳定,因为生产者并没有比消费者领先太多。

2016-12-01 15:52:13,833:6695:140124578850560:Starting.
2016-12-01 15:52:13,835:6695:140124535109376:Producing 0.
2016-12-01 15:52:13,841:6695:140124578850560:Consuming 0, using 255.0 MB.
2016-12-01 15:52:16,424:6695:140124578850560:Consuming 500, using 255.0 MB.
2016-12-01 15:52:18,498:6695:140124535109376:Producing 1000.
2016-12-01 15:52:19,015:6695:140124578850560:Consuming 1000, using 255.0 MB.
2016-12-01 15:52:21,602:6695:140124578850560:Consuming 1500, using 255.0 MB.
2016-12-01 15:52:23,675:6695:140124535109376:Producing 2000.
2016-12-01 15:52:24,192:6695:140124578850560:Consuming 2000, using 255.0 MB.
2016-12-01 15:52:26,776:6695:140124578850560:Consuming 2500, using 255.0 MB.
2016-12-01 15:52:28,846:6695:140124535109376:Producing 3000.
2016-12-01 15:52:29,362:6695:140124578850560:Consuming 3000, using 255.0 MB.
2016-12-01 15:52:31,951:6695:140124578850560:Consuming 3500, using 255.0 MB.
2016-12-01 15:52:34,022:6695:140124535109376:Producing 4000.
2016-12-01 15:52:34,538:6695:140124578850560:Consuming 4000, using 255.0 MB.
2016-12-01 15:52:37,128:6695:140124578850560:Consuming 4500, using 255.0 MB.
2016-12-01 15:52:39,193:6695:140124535109376:Producing 5000.
2016-12-01 15:52:39,704:6695:140124578850560:Consuming 5000, using 255.0 MB.
2016-12-01 15:52:42,291:6695:140124578850560:Consuming 5500, using 255.0 MB.
2016-12-01 15:52:44,361:6695:140124535109376:Producing 6000.
2016-12-01 15:52:44,878:6695:140124578850560:Consuming 6000, using 255.0 MB.
2016-12-01 15:52:47,465:6695:140124578850560:Consuming 6500, using 255.0 MB.

更新

如果您正在使用 multiprocessing.Pool,请考虑升级到 concurrent.futures.process.ProcessPoolExecutor,因为它处理 killed workers更好的。它不会影响此问题中描述的问题。

关于python - multiprocessing.Pool.imap_unordered 的内存使用量稳步增长,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40922526/

相关文章:

python - 如何编写可选择充当常规函数的 asyncio 协程?

用于并行处理的 Python 多处理池

python - 检查 for 循环变量是否驻留在列表的两个索引处

python - 在字典上迭代列表以映射项目

java - AsyncTask "onPostExecute"永远不会被调用

c# - 使用 Entity Framework 构建一个简单的多线程时事通讯引擎

Python 多处理在绘图期间挂起

python - 为什么 multiprocessing.Pool.map 比内置 map 慢?

python - 为每个查询打开一个新的 MySQL 游标是否很慢?

multithreading - 线程和同步