python - 如何处理异步作业的子批处理?

标签 python subprocess

我有一组异步作业(大约 100 个),我希望对每个作业使用 subprocess.popen 以 5 个为一组运行。我的计划是:

  1. 执行作业列表中的前五个作业
  2. 每分钟左右轮询一次事件作业(每个作业需要几分钟才能运行)
  3. 如果一项作业完成,则执行下一项作业,始终保证我们一次运行五个作业
  4. 继续,直到我们浏览完整个作业列表

Python 中是否有已知的模式可以执行此操作?

最佳答案

在 Python 2 中,我为此使用了 multiprocessing.Poolsubprocess 的组合。但这确实会产生池进程形式的额外开销。

因此,在 Python 3 中,我使用 concurrent.futures.ThreadPoolExecutor 而不是 multiprocessing.pool

下面的代码片段展示了如何使用ThreadPoolExecutor

import concurrent.futures as cf
import logging
import os

errmsg = 'conversion of track {} failed, return code {}'
okmsg = 'finished track {}, "{}"'
num = len(data['tracks'])
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
    fl = [tp.submit(runflac, t, data) for t in range(num)]
    for fut in cf.as_completed(fl):
        idx, rv = fut.result()
        if rv == 0:
            logging.info(okmsg.format(idx+1, data['tracks'][idx]))
        else:
            logging.error(errmsg.format(idx+1, rv))

runflac函数使用subprocess调用flac(1)来转换音乐文件:

import subprocess

def runflac(idx, data):
    """Use the flac(1) program to convert a music file to FLAC format.

    Arguments:
        idx: track index (starts from 0)
        data: album data dictionary

    Returns:
        A tuple containing the track index and return value of flac.
    """
    num = idx + 1
    ifn = 'track{:02d}.cdda.wav'.format(num)
    args = ['flac', '--best', '--totally-silent',
            '-TARTIST=' + data['artist'], '-TALBUM=' + data['title'],
            '-TTITLE=' + data['tracks'][idx],
            '-TDATE={}'.format(data['year']),
            '-TGENRE={}'.format(data['genre']),
            '-TTRACKNUM={:02d}'.format(num), '-o',
            'track{:02d}.flac'.format(num), ifn]
    rv = subprocess.call(args, stdout=subprocess.DEVNULL,
                        stderr=subprocess.DEVNULL)
    return (idx, rv)

更新:

在 Python 2.7 中,还有另一种技术稍微复杂一些,但避免了使用多处理池的开销。

基本形式是:

starter = functools.partial(startencoder, crf=args.crf, preset=args.preset)
procs = []
maxprocs = cpu_count()
for ifile in args.files:
    while len(procs) == maxprocs:
        manageprocs(procs)
    procs.append(starter(ifile))
while len(procs) > 0:
    manageprocs(procs)

(使用functools.partial是为函数设置默认参数的一种方法。它与原理无关。)startencoder函数基本上是一个包装器subprocess.Popen,但它返回一些除了Popen实例之外的额外信息;

def startencoder(fname, crf, preset):
    """
    Use ffmpeg to convert a video file to H.264/AAC streams in an MP4
    container.

    Arguments:
        fname: Name of the file to convert.
        crf: Constant rate factor. See ffmpeg docs.
        preset: Encoding preset. See ffmpeg docs.

    Returns:
        A 3-tuple of a Process, input path and output path.
    """
    basename, ext = os.path.splitext(fname)
    known = ['.mp4', '.avi', '.wmv', '.flv', '.mpg', '.mpeg', '.mov', '.ogv',
            '.mkv', '.webm']
    if ext.lower() not in known:
        ls = "File {} has unknown extension, ignoring it.".format(fname)
        logging.warning(ls)
        return (None, fname, None)
    ofn = basename + '.mp4'
    args = ['ffmpeg', '-i', fname, '-c:v', 'libx264', '-crf', str(crf),
            '-preset', preset, '-flags',  '+aic+mv4', '-c:a', 'libfaac',
            '-sn', '-y', ofn]
    try:
        p = subprocess.Popen(args, stdout=subprocess.DEVNULL,
                            stderr=subprocess.DEVNULL)
        logging.info("Conversion of {} to {} started.".format(fname, ofn))
    except:
        logging.error("Starting conversion of {} failed.".format(fname))
    return (p, fname, ofn)

重要的是manageprocs函数:

def manageprocs(proclist):
    """
    Check a list of subprocesses tuples for processes that have ended and
    remove them from the list.

    Arguments:
        proclist: a list of (process, input filename, output filename)
                tuples.
    """
    nr = '# of conversions running: {}\r'.format(len(proclist))
    logging.info(nr)
    sys.stdout.flush()
    for p in proclist:
        pr, ifn, ofn = p
        if pr is None:
            proclist.remove(p)
        elif pr.poll() is not None:
            logging.info('Conversion of {} to {} finished.'.format(ifn, ofn))
            proclist.remove(p)
    sleep(0.5)

关于python - 如何处理异步作业的子批处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36671002/

相关文章:

python - 如何从 pmdarima.auto_arima 中提取估计(无预测)值

python - 如何从子进程访问子进程 Popen pass_fds 参数?

python-2.7 - 在Python多处理中,当子进程完全存在时,标准输出会发生什么?

python - 恢复训练好的 tensorflow 模型,编辑与节点关联的值,然后保存

python - 通过存储索引提高 Pandas 过滤速度?

python - 字典解包运算符与 copy.deepcopy 之间的区别?

javascript - 用 Python 发布交互式科学成果

python - Os.PathLike 对象 - 处理通过curl发送的文件

python - 使用全局单例实例将子进程对象存储在内存中

python - 使用 asyncio(Python 3.4+)异步接收长时间运行的 shell 命令的输出?