python - 为什么使用 xargs --max-procs 加扰具有无缓冲输出的 python 进程?

标签 python pipe stdout buffered

我正在执行多个像这样的 python 进程:

find /path/to/logfiles/*.gz | xargs -n1 -P4 python logparser.py

并且输出偶尔会被打乱。

输出流是无缓冲的,写入的大小更小 比默认系统(osx 10.8.2,python 2.7.2)定义的 PIPE_BUF 512 字节,所以我相信写入应该是原子的,但输出偶尔会被打乱。我一定遗漏了一些东西,如有任何建议,我们将不胜感激。

谢谢。

脚本的简化框架是:

import argparse
import csv
import gzip


class class UnbufferedWriter(object):
    """Unbuffered Writer from 
       http://mail.python.org/pipermail/tutor/2003-November/026645.html

    """

    def __init__(self, stream):
        self.stream = stream

    def write(self, data):
        self.stream.write(data)
        self.stream.flush()

    def __getattr__(self, attr):
        return getattr(self.stream, attr)


def parse_records(infile):
    if infile.name.endswith('.gz'):
        lines = gzip.GzipFile(fileobj=infile)
    else:
        lines = infile

    for line in lines:
        # match lines with regex and filter out on some conditions.
        yield line_as_dict

def main(infile, outfile):
    fields = ['remote_addr', 'time', 'request_time', 'request', 'status']
    writer = csv.DictWriter(outfile, fields, quoting=csv.QUOTE_ALL)

    for record in parse_records(infile):
        row_as_dict = dict(
            remote_addr=record.get('remote_addr', ''),
            time=record.get('time', ''),
            request_time=record.get('request_time', ''),
            request=record.get('request', ''),
            status=record.get('status', '')
        )
        writer.writerow(row_as_dict)

if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)
    parser.add_argument('outfile', nargs='?', type=argparse.FileType('w', 0), default=sys.stdout)

    pargs = parser.parse_args()
    pargs.outfile = UnbufferedWriter(pargs.outfile)

    main(pargs.infile, pargs.outfile)

最佳答案

您可能要考虑使用 GNU Parallel .默认情况下,输出会被缓冲,直到实例完成运行:

When running jobs that output data, you often do not want the output of multiple jobs to run together. GNU parallel defaults to grouping the output of each job, so the output is printed when the job finishes. If you want the output to be printed while the job is running you can use -u.

我相信运行脚本的最佳方式是 vai:

find /path/to/logfiles/*.gz | parallel python logparser.py

parallel python logparser.py ::: /path/to/logfiles/*.gz

您可以使用 -j 标志指定要运行的进程数,即 -j4

Parallel 的好处在于它支持输入参数的笛卡尔积。例如,如果您想要为每个文件迭代一些额外的参数,您可以使用:

parallel python logparser.py ::: /path/to/logfiles/*.gz ::: 1 2 3

这将导致跨多个进程运行以下命令:

python logparser.py /path/to/logfiles/A.gz 1
python logparser.py /path/to/logfiles/A.gz 2
python logparser.py /path/to/logfiles/A.gz 3
python logparser.py /path/to/logfiles/B.gz 1
python logparser.py /path/to/logfiles/B.gz 2
python logparser.py /path/to/logfiles/B.gz 3
...

祝你好运!

关于python - 为什么使用 xargs --max-procs 加扰具有无缓冲输出的 python 进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14367538/

相关文章:

linux - Mac OS X 终端 - 我该如何 "clear | java foo"?

python - 是否需要为他们定位的每个站点编写爬虫?

python - PostgreSQL-ModuleNotFoundError : No module named 'psycopg2'

bash - git log 的输出在通过管道传输到文件时丢失了——我错过了什么?

c++ - 将一个程序的输出通过管道传输到另一个程序不适用于该特定程序

python - 关于 python 中 pexpect 的问题

python - 删除 ANSI 颜色代码时打印到 STDOUT 和日志文件

python - Python中的字母序列

python - OpenCV 中的 Canny 可以同时处理灰度和彩色图像吗?

linux - 为什么我在 shell 中读取 fifo 时出错?