Python multiprocessing.pool 与类目标函数和神经进化的交互

标签 python multiprocessing pytorch genetic-algorithm

警告,这会很长,因为我想尽可能具体。


确切问题:这是一个多处理问题。我已确保我的类在之前的实验中都按照构建/预期的方式运行。

编辑:事先说过线程。


当我在线程环境中运行我的问题的玩具示例时,一切正常;但是,当我过渡到真正的问题时,代码就会中断。具体来说,我得到一个 TypeError: can't pickle _thread.lock objects 错误。完整堆栈在底部。

我这里的线程需求与我从 -- https://github.com/CMA-ES/pycma/issues/31 改编我的代码的例子有点不同。 .在这个例子中,我们有一个适应度函数,它可以被每个评估独立调用,并且没有一个函数调用可以相互交互。然而,在我的实际问题中,我们正在尝试使用遗传算法优化神经网络权重。 GA 将建议潜在的权重,我们需要在我们的环境中评估这些 NN Controller 权重。在单线程情况下,我们可以只有一个环境,我们可以在其中使用简单的 for 循环评估权重:[nn.evaluate(weights) for weights in potential_candidates],找到表现最好的个体,并在下一轮突变中使用这些权重。但是,我们不能简单地在线程环境中进行一次模拟。

因此,我没有传递单个函数来进行评估,而是传递了一个函数列表(每个人一个,环境相同,但我们已经 fork 了进程,因此通信流不会交互个人之间。)

还有一件事需要立即注意: 我正在使用来自 neat 的并行评估数据结构

from neat.parallel import ParallelEvaluator # 使用 multiprocessing.Pool

玩具示例代码:

NPARAMS = nn.flat_init_weights.shape[0]    # make this a 1000-dimensional problem.
NPOPULATION = 5                            # use population size of 5.
MAX_ITERATION = 100                        # run each solver for 100 function calls.

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(x):
    time.sleep(0.1)
    return sum(x**2)

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, weights, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, w in enumerate(weights):
        jobs.append(self.pool.apply_async(self.eval_function[i], (w, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [fitness]*NPOPULATION)

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X))
    es.disp()

必要背景:

当我从玩具示例切换到我的真实代码时,上面的代码失败了。

我的类(class)是:

LevelGenerator (simple GA class that implements mutate, etc)
GridGame (OpenAI wrapper; launches a Java server in which to run the simulation; 
          handles all communication between the Agent and the environment)
Agent    (neural-network class, has an evaluate fn which uses the NN to play a single rollout)
Objective (handles serializing/de-serializing weights: numpy <--> torch; launching the evaluate function)

# The classes get composed to get the necessary behavior:
env   = GridGame(Generator)
agent = NNAgent(env)                # NNAgent is a subclass of (Random) Agent)
obj   = PyTorchObjective(agent)

# My code normally all interacts like this in the single-threaded case:

def test_solver(solver): # Solver: CMA-ES, Differential Evolution, EvolutionStrategy, etc
    history = []
    for j in range(MAX_ITERATION):
        solutions = solver.ask() #2d-numpy array. (POPSIZE x NPARAMS)
        fitness_list = np.zeros(solver.popsize)
        for i in range(solver.popsize):
            fitness_list[i] = obj.function(solutions[i], len(solutions[i]))
        solver.tell(fitness_list)
        result = solver.result() # first element is the best solution, second element is the best fitness
        history.append(result[1])

        scores[j] = fitness_list

    return history, result

所以,当我尝试运行时:

NPARAMS = nn.flat_init_weights.shape[0]        
NPOPULATION = 5                                
MAX_ITERATION = 100                            

_x = NNAgent(GridGame(Generator))

gyms = [_x.mutate(0.0) for _ in range(NPOPULATION)]
objs = [PyTorchObjective(a) for a in gyms]

def evaluate(objective, weights):
    return objective.fun(weights, len(weights))

import time
from neat.parallel import ParallelEvaluator  # uses multiprocessing.Pool
import cma

def fitness(agent):
    return agent.evalute()

# # serial evaluation of all solutions
# def serial_evals(X, f=fitness, args=()):
#     return [f(x, *args) for x in X]

# parallel evaluation of all solutions
def _evaluate2(self, X, *args):
    """redefine evaluate without the dependencies on neat-internal data structures
    """
    jobs = []
    for i, x in enumerate(X):
        jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))

    return [job.get() for job in jobs]

ParallelEvaluator.evaluate2 = _evaluate2
parallel_eval = ParallelEvaluator(12, [obj.fun for obj in objs])
# obj.fun takes in the candidate weights, loads them into the NN, and then evaluates the NN in the environment.

# time both
for eval_all in [parallel_eval.evaluate2]:
    es = cma.CMAEvolutionStrategy(NPARAMS * [1], 1, {'maxiter': MAX_ITERATION, 
                                                     'popsize': NPOPULATION})
    es.disp_annotation()
    while not es.stop():
        X = es.ask()
        es.tell(X, eval_all(X, NPARAMS))
    es.disp()

我收到以下错误:

TypeError                            Traceback (most recent call last)
<ipython-input-57-3e6b7bf6f83a> in <module>
      6     while not es.stop():
      7         X = es.ask()
----> 8         es.tell(X, eval_all(X, NPARAMS))
      9     es.disp()

<ipython-input-55-2182743d6306> in _evaluate2(self, X, *args)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

<ipython-input-55-2182743d6306> in <listcomp>(.0)
     14         jobs.append(self.pool.apply_async(self.eval_function[i], (x, ) + args))
     15 
---> 16     return [job.get() for job in jobs]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in get(self, timeout)
    655             return self._value
    656         else:
--> 657             raise self._value
    658 
    659     def _set(self, i, obj):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
    429                         break
    430                     try:
--> 431                         put(task)
    432                     except Exception as e:
    433                         job, idx = task[:2]

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/connection.py in send(self, obj)
    204         self._check_closed()
    205         self._check_writable()
--> 206         self._send_bytes(_ForkingPickler.dumps(obj))
    207 
    208     def recv_bytes(self, maxlength=None):

~/miniconda3/envs/thesis/lib/python3.7/multiprocessing/reduction.py in dumps(cls, obj, protocol)
     49     def dumps(cls, obj, protocol=None):
     50         buf = io.BytesIO()
---> 51         cls(buf, protocol).dump(obj)
     52         return buf.getbuffer()
     53 

TypeError: can't pickle _thread.lock objects

我还在这里读到,这可能是由于这是一个类函数这一事实引起的 -- TypeError: can't pickle _thread.lock objects -- 所以我创建了全局范围内的适应度函数 def fitness(agent): return agent.evalute(),但这也不起作用。

我认为这个错误可能是因为最初我将 PyTorchObjective 类中的评估函数作为 lambda 函数,但当我更改它时它仍然出错。

任何见解都将不胜感激,并感谢您阅读这面巨大的文字墙。

最佳答案

您没有使用多线程。 You are using multiple processes.

您传递给 apply_async 的所有参数,包括函数本身,都在后台被序列化(pickled)并通过 IPC channel 传递给工作进程(阅读 multiprocessing documentation 了解详细信息) .因此,您不能传递任何与本质上是过程本地的事物相关联的实体。这包括大多数同步原语,因为它们必须使用锁来执行原子操作。

每当这种情况发生时 ( as many other questions on this error message show ),您可能太聪明了,将一个已经内置了并行化逻辑的对象传递给并行化框架。


如果你想用这样的“并行化对象”创建“多级并行化”,你最好选择:

关于Python multiprocessing.pool 与类目标函数和神经进化的交互,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59441355/

相关文章:

python - pdf文件中的多行正则表达式

c - ptrace 选项在父进程中不起作用

python - 试图通过自定义损失,但它不允许我这样做。属性错误: 'float' object has no attribute 'backward'

python - 预期目标大小 (50, 88),得到 torch.Size([50, 288, 88])

python - 任何 CUDA 版本都不完整的 PyTorch(模块 'torch' 没有属性 'cuda' )

Python 执行多个任务

python - 如何从python中以前缀开头的列表中删除字符串

python - 简单的 .cgi 内部服务器错误,出了什么问题?

python - 为什么我没有看到通过 Python 中的多处理加速?

Python 多处理子进程在运行退出时挂起