路由中的 Python3 Flask asyncio 子进程挂起

标签 python python-3.x linux flask python-asyncio

我在 Ubuntu 18.04 上使用 Flask 1.0.2 和 Python 3.6。我的应用程序应该使用 asyncio 和 asyncio.create_subprocess_exec()启动后台脚本,从中读取标准输出,然后在脚本完成后返回状态。

我基本上是想从这篇文章中实现一个答案:
Non-blocking read on a subprocess.PIPE in python

该脚本已成功启动,我从中获得了所有预期的输出,但问题是它永远不会返回(意味着永远不会到达 Killing subprocess now 行)。当我从 Linux 终端检查进程列表 ( ps ) 时,后台脚本已退出。

我做错了什么,如何成功突破async for line in process.stdout环形?

在我导入后的文件顶部,我创建了我的事件循环:

# Create a loop to run all the tasks in.
global eventLoop ; asyncio.set_event_loop(None)
eventLoop = asyncio.new_event_loop()
asyncio.get_child_watcher().attach_loop(eventLoop)

我在我的路线上方定义了我的异步协程:
async def readAsyncFunctionAndKill(cmd):
    # Use global event loop
    global eventLoop

    print("[%s] Starting async Training Script ..." % (os.path.basename(__file__)))
    process = await asyncio.create_subprocess_exec(cmd,stdout=PIPE,loop=eventLoop)
    print("[%s] Starting to read stdout ..." % (os.path.basename(__file__)))
    async for line in process.stdout:
        line = line.decode(locale.getpreferredencoding(False))
        print("%s"%line, flush=True)
    print("[%s] Killing subprocess now ..." % (os.path.basename(__file__)))
    process.kill()
    print("[%s] Training process return code was: %s" % (os.path.basename(__file__), process.returncode))
    return await process.wait()  # wait for the child process to exit

我的(缩写)路线在这里:
@app.route("/train_model", methods=["GET"])
def train_new_model():
    # Use global event loop
    global eventLoop   

    with closing(eventLoop):        
        eventLoop.run_until_complete(readAsyncFunctionAndKill("s.py"))

    return jsonify("done"), 200

调用的“s.py”脚本被标记为可执行文件并且位于同一工作目录中。缩写脚本如下所示(它包含几个子进程并实例化 PyTorch 类):
def main():

    # Ensure that swap is activated since we don't have enough RAM to train our model otherwise
    print("[%s] Activating swap now ..." % (os.path.basename(__file__)))
    subprocess.call("swapon -a", shell=True)

    # Need to initialize GPU
    print("[%s] Initializing GPU ..." % (os.path.basename(__file__)))
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    defaults.device = torch.device("cuda")
    with torch.cuda.device(0):
        torch.tensor([1.]).cuda()

    print("[%s] Cuda is Available: %s - with Name: %s ..." % (os.path.basename(__file__),torch.cuda.is_available(),torch.cuda.get_device_name(0)))

    try:

        print("[%s] Beginning to train new model and replace existing model ..." % (os.path.basename(__file__)))


        # Batch size
        bs = 16
        #bs = 8

        # Create ImageBunch
        tfms = get_transforms(do_flip=True,
                              flip_vert=True,
                              max_rotate=180.,
                              max_zoom=1.1,
                              max_lighting=0.5,
                              max_warp=0.1,
                              p_affine=0.75,
                              p_lighting=0.75)

        # Create databunch using folder names as class names
        # This also applies the transforms and batch size to the data
        os.chdir(TRAINING_DIR)
        data = ImageDataBunch.from_folder("TrainingData", ds_tfms=tfms, train='.', valid_pct=0.2, bs=bs)

        ...    

        # Create a new learner with an early stop callback
        learn = cnn_learner(data, models.resnet18, metrics=[accuracy], callback_fns=[
            partial(EarlyStoppingCallback, monitor='accuracy', min_delta=0.01, patience=3)])

        ... 

        print("[%s] All done training ..." % (os.path.basename(__file__)))

        # Success
        sys.exit(0)

    except Exception as err:

        print("[%s] Error training model [ %s ] ..." % (os.path.basename(__file__),err))
        sys.exit(255)

if __name__== "__main__":
  main()

最佳答案

这里有几个问题:

  • 您在导入时创建了一个新的事件循环,一次,但在您的 View 中关闭事件循环。根本不需要关闭循环,因为第二个请求现在将失败,因为循环已关闭。
  • asyncio 事件循环不是线程安全的,不应在线程之间共享。绝大多数 Flask 部署将使用线程来处理传入的请求。您的代码带有应如何处理的回声,但不幸的是,这不是正确的方法。例如。 asyncio.get_child_watcher().attach_loop(eventLoop)主要是多余的,因为 eventLoop = asyncio.new_event_loop() ,如果在主线程上运行,已经做到了。
    这是您所看到的问题的主要候选者。
  • 您的代码假定可执行文件实际上存在且可执行。您应该处理 OSError异常(和子类),因为不合格 s.py仅当它被设为可执行时才有效,以 #! 开头shebang 线,可在 PATH 上找到.它不会仅仅因为它在同一个目录中而工作,你也不想依赖当前的工作目录。
  • 您的代码假定该进程在某个时候关闭标准输出。如果子进程从不关闭标准输出(进程退出时自动发生的事情),那么您的 async for line in process.stdout:循环也会永远等待。考虑向代码添加超时以避免被错误的子进程阻塞。

  • 在多线程应用程序中使用 asyncio 子进程时,您确实想阅读 Python asyncio 文档中的两个部分:
  • Concurrency and Multithreading section ,解释说几乎所有的 asyncio 对象都不是线程安全的。您不想直接从其他线程向循环中添加任务;你想为每个线程使用一个事件循环,或者使用 asyncio.run_coroutine_threadsafe() function在特定线程中的循环上运行协程。
  • 对于 3.7 以下的 Python 版本,您还需要阅读 Subprocess and Threads section , 因为直到那个版本 asyncio使用非阻塞 os.waitpid(-1, os.WNOHANG)调用跟踪子状态并依赖于使用信号处理(只能在主线程上完成)。 Python 3.8 移除了这个限制(通过添加一个新的 child watcher implementation,它在单独的线程中使用阻塞的每个进程 os.waitpid() 调用,以额外的内存为代价。
    但是,您不必坚持默认的子观察者策略。您可以使用 EventLoopPolicy.set_child_watcher() 并传入 different process watcher instance .实际上,这意味着向后移植 3.8 ThreadedChildWatcher implementation .

  • 对于您的用例,确实不需要为每个线程运行一个新的事件循环。根据需要在单独的线程中运行单个循环。如果您在单独的线程中使用循环,根据您的 Python 版本,您可能还需要在主线程上运行循环或使用不同的进程观察器。一般来说,在 WSGI 服务器的主线程上运行 asyncio 循环并不容易,甚至不可能。
    因此,您需要在单独的线程中永久运行一个循环,并且您需要使用一个无需主线程循环即可工作的子进程观察器。这是一个实现,这应该适用于 Python 3.6 及更高版本:
    import asyncio
    import itertools
    import logging
    import time
    import threading
    
    try:
        # Python 3.8 or newer has a suitable process watcher
        asyncio.ThreadedChildWatcher
    except AttributeError:
        # backport the Python 3.8 threaded child watcher
        import os
        import warnings
    
        # Python 3.7 preferred API
        _get_running_loop = getattr(asyncio, "get_running_loop", asyncio.get_event_loop)
    
        class _Py38ThreadedChildWatcher(asyncio.AbstractChildWatcher):
            def __init__(self):
                self._pid_counter = itertools.count(0)
                self._threads = {}
    
            def is_active(self):
                return True
    
            def close(self):
                pass
    
            def __enter__(self):
                return self
    
            def __exit__(self, exc_type, exc_val, exc_tb):
                pass
    
            def __del__(self, _warn=warnings.warn):
                threads = [t for t in list(self._threads.values()) if t.is_alive()]
                if threads:
                    _warn(
                        f"{self.__class__} has registered but not finished child processes",
                        ResourceWarning,
                        source=self,
                    )
    
            def add_child_handler(self, pid, callback, *args):
                loop = _get_running_loop()
                thread = threading.Thread(
                    target=self._do_waitpid,
                    name=f"waitpid-{next(self._pid_counter)}",
                    args=(loop, pid, callback, args),
                    daemon=True,
                )
                self._threads[pid] = thread
                thread.start()
    
            def remove_child_handler(self, pid):
                # asyncio never calls remove_child_handler() !!!
                # The method is no-op but is implemented because
                # abstract base class requires it
                return True
    
            def attach_loop(self, loop):
                pass
    
            def _do_waitpid(self, loop, expected_pid, callback, args):
                assert expected_pid > 0
    
                try:
                    pid, status = os.waitpid(expected_pid, 0)
                except ChildProcessError:
                    # The child process is already reaped
                    # (may happen if waitpid() is called elsewhere).
                    pid = expected_pid
                    returncode = 255
                    logger.warning(
                        "Unknown child process pid %d, will report returncode 255", pid
                    )
                else:
                    if os.WIFSIGNALED(status):
                        returncode = -os.WTERMSIG(status)
                    elif os.WIFEXITED(status):
                        returncode = os.WEXITSTATUS(status)
                    else:
                        returncode = status
    
                    if loop.get_debug():
                        logger.debug(
                            "process %s exited with returncode %s", expected_pid, returncode
                        )
    
                if loop.is_closed():
                    logger.warning("Loop %r that handles pid %r is closed", loop, pid)
                else:
                    loop.call_soon_threadsafe(callback, pid, returncode, *args)
    
                self._threads.pop(expected_pid)
    
        # add the watcher to the loop policy
        asyncio.get_event_loop_policy().set_child_watcher(_Py38ThreadedChildWatcher())
    
    __all__ = ["EventLoopThread", "get_event_loop", "stop_event_loop", "run_coroutine"]
    
    logger = logging.getLogger(__name__)
    
    class EventLoopThread(threading.Thread):
        loop = None
        _count = itertools.count(0)
    
        def __init__(self):
            name = f"{type(self).__name__}-{next(self._count)}"
            super().__init__(name=name, daemon=True)
    
        def __repr__(self):
            loop, r, c, d = self.loop, False, True, False
            if loop is not None:
                r, c, d = loop.is_running(), loop.is_closed(), loop.get_debug()
            return (
                f"<{type(self).__name__} {self.name} id={self.ident} "
                f"running={r} closed={c} debug={d}>"
            )
    
        def run(self):
            self.loop = loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
    
            try:
                loop.run_forever()
            finally:
                try:
                    shutdown_asyncgens = loop.shutdown_asyncgens()
                except AttributeError:
                    pass
                else:
                    loop.run_until_complete(shutdown_asyncgens)
                loop.close()
                asyncio.set_event_loop(None)
    
        def stop(self):
            loop, self.loop = self.loop, None
            if loop is None:
                return
            loop.call_soon_threadsafe(loop.stop)
            self.join()
    
    _lock = threading.Lock()
    _loop_thread = None
    
    def get_event_loop():
        global _loop_thread
        if _loop_thread is None:
            with _lock:
                if _loop_thread is None:
                    _loop_thread = EventLoopThread()
                    _loop_thread.start()
                    # give the thread up to a second to produce a loop
                    deadline = time.time() + 1
                    while not _loop_thread.loop and time.time() < deadline:
                        time.sleep(0.001)
    
        return _loop_thread.loop
    
    def stop_event_loop():
        global _loop_thread
        with _lock:
            if _loop_thread is not None:
                _loop_thread.stop()
                _loop_thread = None
    
    def run_coroutine(coro):
        return asyncio.run_coroutine_threadsafe(coro, get_event_loop())
    
    以上是我为 Make a Python asyncio call from a Flask route 发布的相同的通用“使用 Flask 运行异步”解决方案,但添加了 ThreadedChildWatcher向后移植。
    然后您可以使用从 get_event_loop() 返回的循环。运行子进程,通过调用 run_coroutine_threadsafe() :
    import asyncio
    import locale
    import logging
    
    logger = logging.getLogger(__name__)
    
    
    def get_command_output(cmd, timeout=None):
        encoding = locale.getpreferredencoding(False)
    
        async def run_async():
            try:
                process = await asyncio.create_subprocess_exec(
                    cmd, stdout=asyncio.subprocess.PIPE)
            except OSError:
                logging.exception("Process %s could not be started", cmd)
                return
            
            async for line in process.stdout:
                line = line.decode(encoding)
                # TODO: actually do something with the data.
                print(line, flush=True)
    
            process.kill()
            logging.debug("Process for %s exiting with %i", cmd, process.returncode)
    
            return await process.wait()
    
        future = run_coroutine(run_async())
        result = None
        try:
            result = future.result(timeout)
        except asyncio.TimeoutError:
            logger.warn('The child process took too long, cancelling the task...')
            future.cancel()
        except Exception as exc:
            logger.exception(f'The child process raised an exception')
        return result
    
    请注意,上述函数可能需要超时(以秒为单位),这是您等待子进程完成的最长时间。

    关于路由中的 Python3 Flask asyncio 子进程挂起,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58547753/

    相关文章:

    python - namedtuple 的相等重载

    java - leiningen 无法运行,因为 java class not found 异常

    linux - 'strip --strip-all' 有效,但 'strip --strip-symbol=symbolname' 无效,因为它是在重定位中命名的

    linux - 如何从 gdb 中自动重新连接到由 valgrind 控制的进程?

    python - 在 PySide QWebPage 中出现不稳定的 "Invalid Signal signature"错误

    python - 在python中将空格替换为+

    python - 在Python 3中计算贷款的结束日期

    python - PyQt5在两个类之间传递参数: lambda vs partial

    python - For 循环似乎向变量添加了比应有的更多内容

    python - 为什么 tkinter 会根据窗口大小不成比例地扩展框架?