python - 如何在内核空闲时通过添加进程在 python 中执行批处理计算?

标签 python python-2.7 parallel-processing multiprocessing

Bash 有一个函数“wait -n”,可以用一种相对简单的方式来停止子进程的后续执行,直到一定数量的处理器内核可用。例如。我可以执行以下操作,

for IJOB in IJOBRANGE;
do

    ./func.x ${IJOB}

    # checking the number of background processes
    # and halting the execution accordingly

    bground=( $(jobs -p) );

    if (( ${#bground[@]} >= CORES )); then
        wait -n
    fi

done || exit 1

此代码段可以批量执行具有不同参数的任意 C 进程“func.x”,并始终保持固定数量的子进程并行实例,设置为值“CORES”。

我想知道是否可以用 python 脚本完成类似的事情,并且 python 子进程(或函数)。目前,我定义了一个 python 函数,设置了一个一维参数数组,并使用 python 多处理模块中的 Pool 例程在参数数组上并行计算该函数。池函数对我的函数执行一定数量的评估(在以下示例中为 CPU CORES 的数量),并等待直到派生进程的所有实例都结束,然后再移动到下一批。

import multiprocessing as mp

def func(x):

    # some computation with x

def main(j):

    # setting the parameter array
    xarray = range(j)

    pool = mp.Pool()
    pool.map(func,xarray)

我想知道是否可以修改此代码段以便始终对我的子例程执行固定数量的并行计算,即在其中一个子进程完成后立即添加另一个进程。这里的所有“func”进程都应该是独立的,执行顺序也无关紧要。我是 python 方式的新手,如果有一些有用的观点真的很棒。

最佳答案

根据我们在评论中的讨论,这里有一些测试代码改编自您的代码,显示 Pool 在将新任务分配给可用工作人员之前不要等待所有并行任务完成:

import multiprocessing as mp
from time import sleep, time


def func(x):
    """sleeps for x seconds"""
    name = mp.current_process().name
    print("{} {}: sleep {}".format(time(), name, x))
    sleep(x)
    print("{} {}: done sleeping".format(time(), name))


def main():

    # A pool of two processes, for the sake of simplicity
    pool = mp.Pool(processes=2)
    # Here's how that works out visually:
    #
    #    0s        1s       2s        3s
    # P1 [sleep(1)][     sleep(2)     ]
    # P2 [     sleep(2)     ][sleep(1)]
    sleeps = [1, 2, 2, 1]
    pool.map(func, sleeps)


if __name__ == "__main__":
    main()

运行此代码给出(为清楚起见简化了时间戳):

$ python3 mp.py 
0s: ForkPoolWorker-1: sleep 1
0s: ForkPoolWorker-2: sleep 2
1s: ForkPoolWorker-1: done sleeping
1s: ForkPoolWorker-1: sleep 2
2s: ForkPoolWorker-2: done sleeping
2s: ForkPoolWorker-2: sleep 1
3s: ForkPoolWorker-1: done sleeping
3s: ForkPoolWorker-2: done sleeping

我们可以看到第一个进程在开始第二个任务之前没有等待第二个进程完成它的第一个任务。

所以我想这应该回答了你提出的问题,希望我已经清楚地理解你。

关于python - 如何在内核空闲时通过添加进程在 python 中执行批处理计算?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49757746/

相关文章:

python - 将 csv 中的行拆分为 python 中的行

concurrency - 竞争和死锁的区别

python - 如何为 anaconda 1.9.1 和 Python 3.3.4 安装 Matplotlib?

python - 如何使用多个类从 QDialog 获取值

python - celery 使用 'application/x-python-serialize' 而不是 `application/json`

Python设置异常

random - 并行循环和随机产生奇怪的结果

design-patterns - 并行实现树遍历算法的策略?

python - 将十六进制字符串转换为 bytes 函数的正确形式

python - 如何简化 IF 语句