python - 多处理 -> pathos.multiprocessing 和 windows

标签 python multiprocessing pickle dill pathos

我目前正在使用 python 中的标准多处理来生成一堆将无限期运行的进程。我不是特别关心性能;每个线程只是在监视文件系统上的不同更改,并在文件被修改时采取适当的操作。

目前,我有一个解决方案可以在 Linux 中满足我的需要。我有一个函数和参数字典,如下所示:

 job_dict['func1'] = {'target': func1, 'args': (args,)}

我为每个人创建了一个流程:

 import multiprocessing
 for k in job_dict.keys():
     jobs[k] = multiprocessing.Process(target=job_dict[k]['target'],
                                       args=job_dict[k]['args'])

有了这个,我可以跟踪每个正在运行的作业,并在必要时重新启动因任何原因崩溃的作业。

这在 Windows 中不起作用。我使用的许多函数都是包装器,使用各种 functools 函数,我收到有关无法序列化函数的消息(请参阅 What can multiprocessing and dill do together? )。我还没有弄清楚为什么我在 Linux 中没有收到此错误,但在 Windows 中却收到了。

如果我在 Windows 中启动我的进程之前导入 dill,我不会收到序列化错误。但是,这些进程实际上并没有做任何事情。我不知道为什么。

然后我切换到 pathos 中的多处理实现,但没有找到与标准 multiprocessing 模块中的简单 Process 类类似的类.我能够使用 pathos.pools.ThreadPool 为每个作业生成线程。这不是 map 的预期用途,我敢肯定,但它启动了所有线程,并且它们在 Windows 中运行:

import pathos
tp = pathos.pools.ThreadPool()
for k in job_dict.keys():
    tp.uimap(job_dict[k]['target'], job_dict[k]['args'])

但是,现在我不确定如何监视线程​​是否仍然处于事件状态,我正在寻找它以便我可以重新启动由于某种原因而崩溃的线程。有什么建议吗?

最佳答案

我是pathosdill 的作者。 Process 类深埋在 pathospathos.helpers.mp.process.Process 中,其中 mp 本身是 multiprocessing 库的实际分支。 multiprocessing 中的所有内容都应该可以从那里访问。

关于 pathos 的另一件事是,它会为您保持 pool 事件,直到您将其从保持状态中删除。这有助于减少创建"new"池的开销。要删除池,您需要执行以下操作:

>>> # create
>>> p = pathos.pools.ProcessPool()
>>> # remove
>>> p.clear()

但是,Process 没有这样的机制。

对于 multiprocessing,windows 与 Linux 和 Macintosh 不同……因为 windows 没有像 linux 上那样的适当的 fork……linux 可以跨进程共享对象,而在Windows 没有共享......它基本上是一个完全独立的新进程创建......因此序列化必须更好,以便对象传递到另一个进程 - 就像你将对象发送到另一台计算机一样。在 Linux 上,您必须这样做才能获得相同的行为:

def check(obj, *args, **kwds):
    """check pickling of an object across another process"""
    import subprocess
    fail = True
    try:
        _x = dill.dumps(x, *args, **kwds)
        fail = False
    finally:
        if fail:
            print "DUMP FAILED"
    msg = "python -c import dill; print dill.loads(%s)" % repr(_x)
    print "SUCCESS" if not subprocess.call(msg.split(None,2)) else "LOAD FAILED"

关于python - 多处理 -> pathos.multiprocessing 和 windows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31732989/

相关文章:

c - 如何同时运行多个fork()?以及如何在c中使用信号量?

python - 存储 Python 字典

python - 递归求幂

python - 如何在 Python 的字典理解中创建值列表

python - 在 Linux Ubuntu 18.04 上从 pip 安装 cartopy 退出并出现各种错误

python - 如何在同一位置旋转图像?

python - 我需要在 Python 多处理环境中使用高效的共享字典

python - zmq.error.ZMQError : Address already in use, 使用 papermill 对多个笔记本运行多处理时

python - 如何解压 pkl 文件?

python - 多处理:AttributeError: 'Imported' 对象没有属性 '__private_method'