python - 监控流类

标签 python stream

我正在尝试创建一个流对象,只要将数据写入其中,该对象就会触发回调函数。

class MonitoredStream():
    def __init__(self, outstream, callback):
        self.outstream = outstream
        self.callback = callback

    def write(self, s):
        self.callback(s)
        self.outstream.write(s)

    def __getattr__(self, attr):
        return getattr(self.outstream, attr)

当我直接调用 write 方法时,这工作得很好,但是当我有一个子进程的输出挂接到流时,我希望它也能工作。例如:

def f(s):
    print("Write")

p = sub.Popen(["sh", "test.sh"], stdout=MonitoredStream(sys.stdout, f))
p.communicate()

这只是将输出直接发送到 sys.stdout,完全绕过写入功能。有没有办法也可以监视此输出?

最佳答案

我认为这里的问题是 subprocess.Popen 不使用管道的 Python 接口(interface) - 它而是获取文件描述符,然后使用它直接写入管道,其中,当您给出 stdout 管道的属性时,意味着它使用该管道,绕过您的代码。

解决这个问题的最佳猜测是在中间创建一个新的中间管道,让您自己处理流。我会将其实现为上下文管理器:

import sys
import os
from subprocess import Popen
from contextlib import contextmanager

@contextmanager
def monitor(stream, callback):
    read, write = os.pipe()
    yield write
    os.close(write)
    with os.fdopen(read) as f:
        for line in f:
            callback(line)
            stream.write(line)

def f(s):
    print("Write")

with monitor(sys.stdout, f) as stream:
    p = Popen(["ls"], stdout=stream)
    p.communicate()

当然,尽管您仍然可以使用类:

import sys
import os
from subprocess import Popen

class MonitoredStream():
    def __init__(self, stream, callback):
        self.stream = stream
        self.callback = callback
        self._read, self._write = os.pipe()

    def fileno(self):
        return self._write

    def process(self):
        os.close(self._write)
        with os.fdopen(self._read) as f:
            for line in f:
                self.callback(line)
                self.stream.write(line)

def f(s):
    print("Write")

stream = MonitoredStream(sys.stdout, f)
p = Popen(["ls"], stdout=stream)
p.communicate()
print(stream.process())

虽然我觉得这不太优雅。

关于python - 监控流类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/10954544/

相关文章:

python - 在cmd中设置类似于java和python的变量名

python - 使用 python 3.3 在 Django 1.9 中导入 WeakMethod 错误

python - 输入文本选项 : curious behavior

python - 使用 Python pip 安装 azure-iothub-device-client 时出错

c++ - 在 g++ 下获取 stringstream >> my_double 以给出无限值或 NaN 的方法?

c# - 有什么理由 GZipStream.Read 会返回少于请求的字节数吗?

python - 警报信号未在无限循环中触发

c++ - 来自 file_descriptor_source (boost::iostreams) 或文件的 istream

java - 在传递给 Java 8 Streams 的映射函数中实现检查异常处理的更好方法是什么

c++ - const char * 到 std::basic_iostream