带有池/队列的 Python 多子进程在一个完成后立即恢复输出并启动队列中的下一个作业

标签 python parallel-processing queue subprocess stdout

我目前正在启动一个子进程并在不等待它完成解析 stdout 的情况下解析 stdout。

for sample in all_samples:
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..

在我的脚本中,我多次执行此操作,具体取决于输入样本的数量。

这里的主要问题是每个子进程都是一个程序/工具,在运行时 100% 使用 1 个 CPU。这需要一些时间……每次输入可能需要 20-40 分钟。

我想要实现的是设置一个同时运行的 N 最大子进程作业进程的池、队列(我不确定这里的确切术语是什么)。这样我就可以最大限度地提高性能,而不是按顺序进行。

因此,例如最多 4 个作业池的执行流程应该是:

  • 启动 4 个子流程。
  • 当其中一个作业完成时,解析标准输出并启动下一个。
  • 这样做直到队列中的所有作业都完成。

如果我能做到这一点,我真的不知道如何确定哪个示例子流程已完成。此时,我不需要识别它们,因为每个子进程都按顺序运行,并且我在子进程打印 stdout 时解析 stdout。

这非常重要,因为我需要识别每个子流程的输出并将其分配给相应的输入/样本。

最佳答案

ThreadPool 可能很适合您的问题,您设置工作线程的数量并添加作业,线程将完成所有任务。

from multiprocessing.pool import ThreadPool
import subprocess


def work(sample):
    my_tool_subprocess = subprocess.Popen('mytool {}'.format(sample),shell=True, stdout=subprocess.PIPE)
    line = True
    while line:
        myline = my_tool_subprocess.stdout.readline()
        #here I parse stdout..


num = None  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
for sample in all_samples:
    tp.apply_async(work, (sample,))

tp.close()
tp.join()

关于带有池/队列的 Python 多子进程在一个完成后立即恢复输出并启动队列中的下一个作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26774781/

相关文章:

c++ 如何优雅地将 c++17 并行执行与计算整数的 for 循环一起使用?

node.js - 简单 promise 队列 : q. 在延迟 promise 解决之前全部解决

java - 多个java spring应用程序实例访问相同的数据库资源

python - 计算从年/月开始到今天的天数

python - 使用 Python 请求库向 DVWA 发布请求时缺少 CSRF token

python - pandas 数据框 to_csv 适用于 sep ='\n' 但不适用于 sep ='\t'

c++ - 多核上的错误共享

c++ - CUDA 对许多小型数组求和

stack - 堆栈和队列之间的基本区别是什么?

python Pandas : extract data from a cell and turn it into a column