Python 多处理跳过子段错误

标签 python python-3.x multiprocessing python-multiprocessing concurrent.futures

我正在尝试对可能返回段错误的函数使用多处理(我无法控制此 ATM)。在子进程遇到段错误的情况下,我只希望那个子进程失败,但所有其他子任务继续/返回它们的结果。

我已经从 multiprocessing.Pool 切换了至 concurrent.futures.ProcessPoolExecutor避免子进程永远挂起(或直到任意超时)的问题,如此错误中所述:https://bugs.python.org/issue22393 .

但是我现在面临的问题是,当第一个子任务遇到段错误时,所有运行中的子进程都被标记为已损坏 (concurrent.futures.process.BrokenProcessPool)。

有没有办法只将实际损坏的子进程标记为损坏?

我在 Python 3.7.4 中运行的代码:

import concurrent.futures
import ctypes
from time import sleep


def do_something(x):
    print(f"{x}; in do_something")
    sleep(x*3)
    if x == 2:
        # raise a segmentation fault internally
        return x, ctypes.string_at(0)
    return x, x-1


nums = [1, 2, 3, 1.5]
executor = concurrent.futures.ProcessPoolExecutor()
result_futures = []
for num in nums:
    # Using submit with a list instead of map lets you get past the first exception
    # Example: https://stackoverflow.com/a/53346191/7619676
    future = executor.submit(do_something, num)
    result_futures.append(future)

# Wait for all results
concurrent.futures.wait(result_futures)

# After a segfault is hit for any child process (i.e. is "terminated abruptly"), the process pool becomes unusable
# and all running/pending child processes' results are set to broken
for future in result_futures:
    try:
        print(future.result())
    except concurrent.futures.process.BrokenProcessPool:
        print("broken")

结果:

(1, 0)
broken
broken
(1.5, 0.5)

期望的结果:

(1, 0)
broken
(3, 2)
(1.5, 0.5)

最佳答案

multiprocessing.Poolconcurrent.futures.ProcessPoolExecutor 都假设如何处理工作进程与主进程之间交互的并发性(如果有的话)一个进程被终止或出现段错误,因此他们会做安全的事情并将整个池标记为已损坏。要解决这个问题,您需要直接使用 multiprocessing.Process 实例构建您自己的具有不同假设的池。

这听起来可能有点吓人,但是 listmultiprocessing.Manager 会让你走得更远:

import multiprocessing
import ctypes
import queue
from time import sleep

def do_something(job, result):
    while True:
        x=job.get()
        print(f"{x}; in do_something")
        sleep(x*3)
        if x == 2:
            # raise a segmentation fault internally
            return x, ctypes.string_at(0)
        result.put((x, x-1))

nums = [1, 2, 3, 1.5]

if __name__ == "__main__":
    # you ARE using the spawn context, right?
    ctx = multiprocessing.get_context("spawn")
    manager = ctx.Manager()
    job_queue = manager.Queue(maxsize=-1)
    result_queue = manager.Queue(maxsize=-1)
    pool = [
        ctx.Process(target=do_something, args=(job_queue, result_queue), daemon=True)
        for _ in range(multiprocessing.cpu_count())
    ]
    for proc in pool:
        proc.start()
    for num in nums:
        job_queue.put(num)
    try:
        while True:
            # Timeout is our only signal that no more results coming
            print(result_queue.get(timeout=10))
    except queue.Empty:
        print("Done!")
    print(pool)  # will see one dead Process 
    for proc in pool:
        proc.kill()  # avoid stderr spam

这个“池”有点不灵活,您可能希望根据应用程序的特定需求对其进行自定义。但是您绝对可以直接跳过段错误 worker 。

当我深入这个兔子洞时,我有兴趣取消对工作池的特定提交,最终我写了一个完整的库来集成到 Trio 异步应用程序中:trio-parallel .希望您不需要走那么远!

关于Python 多处理跳过子段错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70356876/

相关文章:

python - 两组参数 python

python - Crontab 条目导致未知命令

python - 为什么当我运行调用 B.exe 的 A.exe 时失败,因为 B.exe 尝试在 A.exe(调用者)文件夹而不是 B.exe 文件夹(调用者)中查找其模块?

python - 为什么多处理锁定失败?

python - 如何在进程之间共享日期变量 - Multiprocessing python

Python 3.2 在 csv.DictReader 中跳过一行

python - 在 Python 中, open(file).read() 和 subprocess( ['cat' , file]) 之间有什么区别,两者之间是否存在偏好?

具有多个管道的 Python 子进程

python - Pygame 安装在 Windows 上,尝试调用 pygame.image.load() 时得到 "pygame.error: Failed loading libwebp-7.dll"

python - 有没有办法停止 concurrent.futures 中正在运行的进程?