Python:并行执行cat子进程

标签 python shell subprocess python-multithreading

我正在运行多个 cat | zgrep 在远程服务器上执行命令并分别收集它们的输出以供进一步处理:

class MainProcessor(mp.Process):
    def __init__(self, peaks_array):
        super(MainProcessor, self).__init__()
        self.peaks_array = peaks_array

    def run(self):
        for peak_arr in self.peaks_array:
            peak_processor = PeakProcessor(peak_arr)
            peak_processor.start()

class PeakProcessor(mp.Process):
    def __init__(self, peak_arr):
        super(PeakProcessor, self).__init__()
        self.peak_arr = peak_arr

    def run(self):
        command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" '
        log_lines = (subprocess.check_output(command, shell=True)).split('\n')
        process_data(log_lines)

但是,这会导致顺序执行 subprocess('ssh ... cat ...') 命令。第二个高峰等待第一个完成,依此类推。

我如何修改此代码以便子进程调用并行运行,同时仍然能够单独收集每个子进程的输出?

最佳答案

您不需要multiprocessingthreading 来并行运行子进程。例如:

#!/usr/bin/env python
from subprocess import Popen

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True)
             for i in range(5)]
# collect statuses
exitcodes = [p.wait() for p in processes]

它同时运行 5 个 shell 命令。注意:这里既没有使用线程也没有使用 multiprocessing 模块。向 shell 命令添加符号 & 没有意义:Popen 不等待命令完成。您需要显式调用 .wait()

虽然很方便但没必要使用线程来收集子进程的输出:

#!/usr/bin/env python
from multiprocessing.dummy import Pool # thread pool
from subprocess import Popen, PIPE, STDOUT

# run commands in parallel
processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True,
                   stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
             for i in range(5)]

# collect output in parallel
def get_lines(process):
    return process.communicate()[0].splitlines()

outputs = Pool(len(processes)).map(get_lines, processes)

相关:Python threading multiple bash subprocesses? .

这是在同一线程中同时从多个子进程获取输出的代码示例 (Python 3.8+):

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE, STDOUT


async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(
        shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT
    )
    return (await p.communicate())[0].splitlines()


async def main():
    # get commands output in parallel
    coros = [
        get_lines(
            f'"{sys.executable}" -c "print({i:d}); import time; time.sleep({i:d})"'
        )
        for i in range(5)
    ]
    print(await asyncio.gather(*coros))


if __name__ == "__main__":
    asyncio.run(main())

旧(2014)答案(Python 3.4?):

#!/usr/bin/env python3
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

@asyncio.coroutine
def get_lines(shell_command):
    p = yield from asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (yield from p.communicate())[0].splitlines()

if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

# get commands output in parallel
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                    .format(i=i, e=sys.executable)) for i in range(5)]
print(loop.run_until_complete(asyncio.gather(*coros)))
loop.close()

关于Python:并行执行cat子进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/23611396/

相关文章:

python - 我的Python代码中的问题是检测到眨眼次数过多。 (使用眼睛纵横比方法)

shell - 波浪号 (~/) 无法处理 Shell 脚本中的 if then 语句

bash - 如何在不影响其他完成脚本的情况下重置 COMP_WORDBREAKS?

python - 您可以只与子流程通信一次吗?

用于运行 shell 命令的 Python 脚本

Python:模拟 Kafka 进行集成测试

python - 使用 urllib2 HTTPBasicAuthHandler 时,用户名包含空格会出现问题

Python BigQuery allowLargeResults 与 pandas.io.gbq

swift - 使用参数从 swift 应用程序调用 shell

python - django celery 终止任务的子进程