python - 清理从 Flask MethodView API 启动的长时间运行的子进程

标签 python flask multiprocessing

我正在构建一个 Flask MethodView驱动的API。对于特定端点,我使用请求数据来启动可能长时间运行的命令。我没有等待命令完成,而是将其包装在 multiprocessing.Process 中,调用 start,然后向用户返回 HTTP 202 以及可用于监视进程状态的 url。过程。

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202

看起来post 方法中启动的进程在完成后正在变成僵尸,但我不知道如何在不阻止执行的情况下正确跟踪和清理它们父方法。我尝试按照 Python join a process without blocking parent 中的建议实现监视线程。据我了解,它建议运行一个单独的线程来监视 FIFO 队列,然后在返回父函数之前将进程句柄放入队列中。我尝试了一个实现(如下),但看起来您无法将进程对象传递到线程中,因为它包含 protected AuthenticationString 属性。

Traceback (most recent call last):
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/queues.py", line 234, in _feed
|     obj = _ForkingPickler.dumps(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
|     cls(buf, protocol).dump(obj)
|   File "/opt/miniconda3/envs/m137p3/lib/python3.6/multiprocessing/process.py", line 291, in __reduce__
 |     'Pickling an AuthenticationString object is '
| TypeError: Pickling an AuthenticationString object is disallowed for security reasons

这是我的Python 加入进程而不阻塞父进程的实现。我不知道这是否有效,因为上述错误从一开始就关闭了整个系统。非常感谢我如何负责任地启动这些进程而不阻塞调用方法的任何想法或建议。

from threading import Thread
from multiprocessing import Queue, ...

class Joiner(Thread):

    def __init__(self, q):
        super().__init__()
        self.__q = q

    def run(self):
        while True:
            child = self.__q.get()
            if child == None:
                return
            child.join()

class EndPointAPI(MethodView):

    def __init__(self):
        """ On init, filter requests missing JSON body."""
        self._jobs = Queue()            
        self._babysitter = Joiner(self._jobs)
        self._babysitter.start()

        # Check for json payload
        self.except = ["GET", "PUT", "DELETE" ]                                                                                       
        if (request.method not in self.except) and not request.json: 
            abort(400)            

    def _long_running_function(self, json_data):
        """ 
        In this function, I use the input JSON data 
        to write a script to the file system, then 
        use subprocess.run to execute it.
        """
        return

    def post(self):
        """ """

        # Get input data
        json_data = request.json

        # Kick off the long running function
        p = Process(target=long_running_function, args=(json_data,))
        p.start()
        self._jobs.put(p)

        response = {
            "result" : "job accepted",
            "links" : {
                "href" : "/monitor_job/",
            }

        }

        return jsonify(response), 202

最佳答案

你已经很接近了:)一切看起来都很好,除了一件事,你正在使用multiprocessing.Queue来存储正在运行的进程,以便稍后使用Joiner加入它们代码 > 实例。来自 docs你会知道以下内容

Note: When an object is put on a queue, the object is pickled and a background thread later flushes the pickled data to an underlying pipe.

也就是说,进程在放入队列时会被序列化,从而出现以下错误

TypeError: Pickling an AuthenticationString object is disallowed for security reasons

发生这种情况是因为独特的 authentication key每个进程都有。该 key 是一个字节字符串,可以将其视为 multiprocessing.process.AuthenticationString 类型的密码,并且无法进行 pickle。

解决方案很简单,只需使用 queue.Queue 实例来存储长时间运行的进程即可。这是一个工作示例:

#!/usr/bin/env python3
import os
import time
from queue import Queue
from threading import Thread
from multiprocessing import Process


class Joiner(Thread):

    def __init__(self):
        super().__init__()
        self.workers = Queue()

    def run(self):

        while True:
            worker = self.workers.get()

            if worker is None:
                break

            worker.join()


def do_work(t):
    pid = os.getpid()
    print('Process', pid, 'STARTED')
    time.sleep(t)
    print('Process', pid, 'FINISHED')


if __name__ == '__main__':
    joiner = Joiner()
    joiner.start()

    for t in range(1, 6, 2):
        p = Process(target=do_work, args=(t,))
        p.start()
        joiner.workers.put(p)

    joiner.workers.put(None)
    joiner.join()

输出:

Process 14498 STARTED
Process 14500 STARTED
Process 14499 STARTED
Process 14498 FINISHED
Process 14499 FINISHED
Process 14500 FINISHED

关于python - 清理从 Flask MethodView API 启动的长时间运行的子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58294838/

相关文章:

Python 多处理池从第一个 block 的输入中吞下异常

python - 确定 Python JoinableQueue 中有多少项

python - Tkinter 键盘绑定(bind)

python - Flask在每个模块的每个模板中显示用户注册或已经登录

azure - 将 Flask 应用程序部署到 Azure 时出现 WSGI 错误

python - 如何为 flask 中的外部库禁用 ExtDeprecationWarning

python - 如何在 python 中使用 pika (RabbitMQ) 向消费者添加多处理

python - pdoc3-值错误 : File or module not found

python 3.x 安装后没有名为 sqlalchemy 的模块

python - 使用 updateCells API 请求更新多个单元格的格式