Python 多处理 block 在 waiter.acquire() 中不确定

标签 python concurrency multiprocessing freeze

有人可以解释为什么这段代码会阻塞并且无法完成吗?

我遵循了 multiprocessing 的几个例子我写了一些非常相似的代码,不会被阻止。但是,显然,我看不出该工作代码与下面的代码之间有什么区别。一切都很好,我想。它一直到 .get(),但没有一个进程完成。

问题是 python3 在 waiter.acquire() 中无限期地阻塞,你可以通过中断它并读取回溯来判断。

$ python3 ./try415.py
^CTraceback (most recent call last):
  File "./try415.py", line 43, in <module>
    ps = [ res.get() for res in proclist ]
  File "./try415.py", line 43, in <listcomp>
    ps = [ res.get() for res in proclist ]
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 638, in get
    self.wait(timeout)
  File "/usr/lib64/python3.6/multiprocessing/pool.py", line 635, in wait
    self._event.wait(timeout)
  File "/usr/lib64/python3.6/threading.py", line 551, in wait
    signaled = self._cond.wait(timeout)
  File "/usr/lib64/python3.6/threading.py", line 295, in wait
    waiter.acquire()
KeyboardInterrupt

这是代码
from multiprocessing import Pool
from scipy import optimize
import numpy as np

def func(t, a, b, c):
    return 0.5*a*t**2 + b*t + c

def funcwrap(t, params):
    return func(t, *params)

def fitWithErr(procid, yFitValues, simga, func, p0, args, bounds):
    np.random.seed() # force new seed
    randomDelta = np.random.normal(0., sigma, len(yFitValues))
    randomdataY = yFitValues + randomDelta
    errfunc = lambda p, x, y: func(p, x) -y
    optResult = optimize.least_squares(errfunc, p0, args=args, bounds=bounds)
    return optResult.x

def fit_bootstrap(function, datax, datay, p0, bounds, aprioriUnc):
    errfunc = lambda p, x, y: function(x,p) - y
    optResult = optimize.least_squares(errfunc, x0=p0, args=(datax, datay), bounds=bounds)
    pfit = optResult.x
    residuals = optResult.fun
    fity = function(datax, pfit)

    numParallelProcesses = 2**2 # should be equal to number of ALUs
    numTrials = 2**2 # this many random data sets are generated and fitted
    trialParameterList = list()
    for i in range(0,numTrials):
        trialParameterList.append( [i, fity, aprioriUnc, function, p0, (datax, datay), bounds] )

    with Pool(processes=numParallelProcesses) as pool:
        proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]

    ps = [ res.get() for res in proclist ]
    ps = np.array(ps)
    mean_pfit = np.mean(ps,0)

    return mean_pfit

if __name__ == '__main__':
    x = np.linspace(0,3,2000)
    p0 = [-9.81, 1., 0.]
    y = funcwrap(x, p0)
    bounds = [ (-20,-1., -1E-6),(20,3,1E-6) ]
    fit_bootstrap(funcwrap, x, y, p0, bounds=bounds, aprioriUnc=0.1)

最佳答案

很抱歉给出错误的答案。不去验证太不负责任了。这是我的答案。

with Pool(processes=numParallelProcesses) as pool:

这一行是错误的,将调用 退出 功能不关闭。这是退出 函数体:
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.terminate()

所有的过程都将被终止,永远不会被执行。
代码:
ps = [ res.get() for res in proclist ]

没有超时参数。这是 get 函数体:
def get(self, timeout=None):
    self.wait(timeout)
    if not self.ready():
        raise TimeoutError
    if self._success:
        return self._value
    else:
        raise self._value

如果没有超时,它将始终等待。这就是它挂起的原因。

你需要改变
with Pool(processes=numParallelProcesses) as pool:
    proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]

到:
pool=Pool(processes=numParallelProcesses)
proclist = [ pool.apply_async(fitWithErr, args) for args in trialParameterList ]
pool.close()

关于Python 多处理 block 在 waiter.acquire() 中不确定,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51243539/

相关文章:

Python 多处理 PicklingError : Can't pickle <type 'function' >

Python:将 n 元组转换为 x 元组,其中 x < n

python - 如何在 Python 中解析和比较 ISO 8601 持续时间?

concurrency - 使用 OpenMP 的数组中的最大值

multithreading - 在多处理器中, "mov eax,mem"可以与 "lock inc mem"同时出现吗?

Python多处理池一些进程在 fork 时处于死锁状态,但在生成时运行

python - 如何在 Git Bash 的 Vim 中添加 Python 支持?

Python:如何查找列表中特定数量的项目是否相同?

multithreading - 如何创建可在 Rust 多线程服务器中使用的结构?

go - 在多个goroutine之间共享的Golang结构中,非共享成员是否需要互斥保护?