python - 如何处理子进程异常终止?

标签 python process multiprocessing queue python-multiprocessing

我正在使用 python 3.7 并遵循此 documentation 。我想要一个进程,它应该生成一个子进程,等待它完成任务,并获取一些信息。我使用以下代码:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

当子进程正确完成时,没有问题,并且效果很好,但是当我的子进程在完成之前终止时,问题就开始了。 在这种情况下,我的应用程序处于等待状态。

q.get()p.join() 提供超时并不能完全解决问题,因为我想立即知道子进程已死亡并且不要等到超时。

另一个问题是 q.get() 超时会产生异常,我希望避免这种情况。

有人可以建议我一种更优雅的方法来克服这些问题吗?

最佳答案

队列和信号

一种可能是注册一个信号处理程序并使用它来传递哨兵值。 在 Unix 上,您可以在父级中处理 SIGCHLD ,但这不是您的情况的选择。根据signal module :

On Windows, signal() can only be called with SIGABRT, SIGFPE, SIGILL, SIGINT, SIGSEGV, SIGTERM, or SIGBREAK.

不确定通过任务管理器杀死它是否会转化为SIGTERM,但你可以尝试一下。

为了处理 SIGTERM,您需要在子进程中注册信号处理程序。

import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue

SENTINEL = None


def _sigterm_handler(signum, frame, queue):
    print("received SIGTERM")
    queue.put(SENTINEL)
    sys.exit()


def register_sigterm(queue):
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)


def some_func(q):
    register_sigterm(q)
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    for msg in iter(q.get, SENTINEL):
        print(msg)
    p.join()

示例输出:

12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM

Process finished with exit code 0
<小时/>

<强> Queue & Process.is_alive()

即使这适用于任务管理器,您的用例听起来也不能排除强制杀戮,所以我认为您最好采用不依赖信号的方法。

您可以在循环中检查您的进程是否p.is_alive(),调用queue.get()并指定超时,并且处理Empty异常:

import os
import time
from queue import Empty
from multiprocessing import Process, Queue

def some_func(q):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()

    while p.is_alive():
        try:
            msg = q.get(timeout=0.1)
        except Empty:
            pass
        else:
            print(msg)

    p.join()

也可以避免异常,但我不建议这样做,因为您不会将等待时间花在“队列上”,从而降低响应能力:

while p.is_alive():
    if not q.empty():
        msg = q.get_nowait()
        print(msg)
        time.sleep(0.1)
<小时/>

<强> Pipe & Process.is_alive()

如果您打算为每个子项使用一个连接,则可以使用管道而不是队列。它比队列性能更高 (安装在管道顶部),您可以使用 multiprocessing.connection.wait (Python 3.3+)一次等待多个对象准备就绪。

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready. Returns the list of those objects in object_list which are ready. If timeout is a float then the call blocks for at most that many seconds. If timeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both Unix and Windows, an object can appear in object_list if it is a readable Connection object; a connected and readable socket.socket object; or the sentinel attribute of a Process object. A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

Unix: wait(object_list, timeout) almost equivalent select.select(object_list, [], [], timeout). The difference is that, if select.select() is interrupted by a signal, it can raise OSError with an error number of EINTR, whereas wait() will not.

Windows: An item in object_list must either be an integer handle which is waitable (according to the definition used by the documentation of the Win32 function WaitForMultipleObjects()) or it can be an object with a fileno() method which returns a socket handle or pipe handle. (Note that pipe handles and socket handles are not waitable handles.)

您可以使用它来同时等待进程的哨兵属性和管道的父端。

import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


def some_func(conn_write):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        conn_write.send(f'msg_{i}')


if __name__ == '__main__':

    conn_read, conn_write = Pipe(duplex=False)
    p = Process(target=some_func, args=(conn_write,))
    p.start()

    while p.is_alive():
        wait([p.sentinel, conn_read])  # block-wait until something gets ready
        if conn_read.poll():  # check if something can be received
            print(conn_read.recv())
    p.join()

关于python - 如何处理子进程异常终止?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53088232/

相关文章:

python - 根据匹配对不同工作表/文件中的值求和

python - 在 Celery 任务中使用 Redis 连接和保存数据

C++ 从进程中获取用户名

java - 将控制字符发送到Java中的外部进程

Python 多处理错误 : AttributeError: module '__main__' has no attribute '__spec__'

python - 如何将队列引用传递给 pool.map_async() 管理的函数?

parallel-processing - 多进程模块加载

python - 防止在 relplot 中共享 y 轴

python : JSON can't be serialized

linux - 如何杀死Linux中的特定进程?