我正在执行多个像这样的 python 进程:
find /path/to/logfiles/*.gz | xargs -n1 -P4 python logparser.py
并且输出偶尔会被打乱。
输出流是无缓冲的,写入的大小更小 比默认系统(osx 10.8.2,python 2.7.2)定义的 PIPE_BUF 512 字节,所以我相信写入应该是原子的,但输出偶尔会被打乱。我一定遗漏了一些东西,如有任何建议,我们将不胜感激。
谢谢。
脚本的简化框架是:
import argparse
import csv
import gzip
class class UnbufferedWriter(object):
"""Unbuffered Writer from
http://mail.python.org/pipermail/tutor/2003-November/026645.html
"""
def __init__(self, stream):
self.stream = stream
def write(self, data):
self.stream.write(data)
self.stream.flush()
def __getattr__(self, attr):
return getattr(self.stream, attr)
def parse_records(infile):
if infile.name.endswith('.gz'):
lines = gzip.GzipFile(fileobj=infile)
else:
lines = infile
for line in lines:
# match lines with regex and filter out on some conditions.
yield line_as_dict
def main(infile, outfile):
fields = ['remote_addr', 'time', 'request_time', 'request', 'status']
writer = csv.DictWriter(outfile, fields, quoting=csv.QUOTE_ALL)
for record in parse_records(infile):
row_as_dict = dict(
remote_addr=record.get('remote_addr', ''),
time=record.get('time', ''),
request_time=record.get('request_time', ''),
request=record.get('request', ''),
status=record.get('status', '')
)
writer.writerow(row_as_dict)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('infile', nargs='?', type=argparse.FileType('r'), default=sys.stdin)
parser.add_argument('outfile', nargs='?', type=argparse.FileType('w', 0), default=sys.stdout)
pargs = parser.parse_args()
pargs.outfile = UnbufferedWriter(pargs.outfile)
main(pargs.infile, pargs.outfile)
最佳答案
您可能要考虑使用 GNU Parallel .默认情况下,输出会被缓冲,直到实例完成运行:
When running jobs that output data, you often do not want the output of multiple jobs to run together. GNU parallel defaults to grouping the output of each job, so the output is printed when the job finishes. If you want the output to be printed while the job is running you can use -u.
我相信运行脚本的最佳方式是 vai:
find /path/to/logfiles/*.gz | parallel python logparser.py
或
parallel python logparser.py ::: /path/to/logfiles/*.gz
您可以使用 -j
标志指定要运行的进程数,即 -j4
。
Parallel 的好处在于它支持输入参数的笛卡尔积。例如,如果您想要为每个文件迭代一些额外的参数,您可以使用:
parallel python logparser.py ::: /path/to/logfiles/*.gz ::: 1 2 3
这将导致跨多个进程运行以下命令:
python logparser.py /path/to/logfiles/A.gz 1
python logparser.py /path/to/logfiles/A.gz 2
python logparser.py /path/to/logfiles/A.gz 3
python logparser.py /path/to/logfiles/B.gz 1
python logparser.py /path/to/logfiles/B.gz 2
python logparser.py /path/to/logfiles/B.gz 3
...
祝你好运!
关于python - 为什么使用 xargs --max-procs 加扰具有无缓冲输出的 python 进程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14367538/