python - 将 python 子进程作业的输出获取到 Tornado 中

标签 python asynchronous subprocess tornado

我搜索了很多,但没有找到如何将正在运行的 python 子进程的输出获取到 Tornado 中。我想要的是类似 Travis CI 的东西。在管理页面中,我将启 Action 业,服务器将接收请求并启动子进程。该子进程将进行一些数据挖掘,并向字符串缓冲区提供一些日志。我将使用带有 settimeout 的 ajax 或 websocket 获取此日志,并将此日志输出到页面中。即使用户关闭页面并稍后返回,也会有日志并且通常会更新。嗯,确实和 Travis 非常相似。

最佳答案

这篇博文展示了一种执行此操作的方法:http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading

这篇文章本质上展示了如何通过异步读取 stdout 和 stderr 来读取进程的输出时防止死锁。您可以将 __main__ 中的 Producer 命令替换为运行您喜欢的任何命令,并将 print 语句替换为处理 Tornado 中输出的代码。

更新:我已添加以下内容,以防博客被删除:

...what if you want to read standard output and error line by line, for example because you want to monitor a longer running process? On the web you can find many solutions, with varying degrees of complexity, abstraction and dependencies. One solution (with limited code and no dependencies outside the standard library) is to read the pipes in separate threads, so one pipe can't block another.

The code below shows an example implementation. The script is set up in such a way that is used both for the parent as the child process.

For the child process: when called with 'produce' argument, it runs the produce() function that just renders some lines randomly on standard output and standard error. Between the lines there is a touch of delay simulate a longer running process. The parent process (script called without arguments), implemented in the consume() function, invokes the same script in "child mode" as subprocess and monitors its output line by line, without knowing in advance from which pipe each line will come.

The AsynchronousFileReader class is for the threads that will read the standard output and error pipes asynchronously and put each line on a queue. The main thread can then monitor the subprocess by watching the lines as they come in on the queues.

import sys
import subprocess
import random
import time
import threading
import Queue

class AsynchronousFileReader(threading.Thread):
    '''
    Helper class to implement asynchronous reading of a file
    in a separate thread. Pushes read lines on a queue to
    be consumed in another thread.
    '''

    def __init__(self, fd, queue):
        assert isinstance(queue, Queue.Queue)
        assert callable(fd.readline)
        threading.Thread.__init__(self)
        self._fd = fd
        self._queue = queue

    def run(self):
        '''The body of the tread: read lines and put them on the queue.'''
        for line in iter(self._fd.readline, ''):
            self._queue.put(line)

    def eof(self):
        '''Check whether there is no more content to expect.'''
        return not self.is_alive() and self._queue.empty()

def consume(command):
    '''
    Example of how to consume standard output and standard error of
    a subprocess asynchronously without risk on deadlocking.
    '''

    # Launch the command as subprocess.
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    # Launch the asynchronous readers of the process' stdout and stderr.
    stdout_queue = Queue.Queue()
    stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue)
    stdout_reader.start()
    stderr_queue = Queue.Queue()
    stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue)
    stderr_reader.start()

    # Check the queues if we received some output (until there is nothing more to get).
    while not stdout_reader.eof() or not stderr_reader.eof():
        # Show what we received from standard output.
        while not stdout_queue.empty():
            line = stdout_queue.get()
            print 'Received line on standard output: ' + repr(line)

        # Show what we received from standard error.
        while not stderr_queue.empty():
            line = stderr_queue.get()
            print 'Received line on standard error: ' + repr(line)

        # Sleep a bit before asking the readers again.
        time.sleep(.1)

    # Let's be tidy and join the threads we've started.
    stdout_reader.join()
    stderr_reader.join()

    # Close subprocess' file descriptors.
    process.stdout.close()
    process.stderr.close()

def produce(items=10):
    '''
    Dummy function to randomly render a couple of lines
    on standard output and standard error.
    '''
    for i in range(items):
        output = random.choice([sys.stdout, sys.stderr])
        output.write('Line %d on %s\n' % (i, output))
        output.flush()
        time.sleep(random.uniform(.1, 1))

if __name__ == '__main__':
    # The main flow:
    # if there is an command line argument 'produce', act as a producer
    # otherwise be a consumer (which launches a producer as subprocess).
    if len(sys.argv) == 2 and sys.argv[1] == 'produce':
        produce(10)
    else:
        consume(['python', sys.argv[0], 'produce'])

关于python - 将 python 子进程作业的输出获取到 Tornado 中,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17735018/

相关文章:

c# - 异步更新 ASP.NET 面板

asynchronous - 出站 ChannelHandler 的全面异常处理

python - 根据正则表达式中的相同值选择组

javascript - NodeJS Mongodb 与数组不同不起作用

python - 在标签不在训练集中的测试数据上使用 MultilabelBinarizer

python - Go 子进程通信

python - 如何在 Python 中使用 dir/s 命令?

Python-yad 进度条在 python 3.4 中不起作用,但在 python 2.7 中起作用

python - django 中未定义 DateTimeField 错误

python - 将字符串转换为数学函数: "name sint is not defined"