python - 并行流水线编程

标签 python linux snakemake

我有一个 Linux 虚拟机,它每 4 小时更新一次新数据文件。 这些文件按编号 01 到 10 组织在目录中。

我有一个可执行文件 (convert.exe),可以将上传的文件转换为不同的文件类型。

我想开发一个管道来处理文件 (convert.exe),然后将它们重定向到另一个目录。

我已经用 linux bash 脚本对此进行了系列编程。 使用以下代码:

for d in $(find /mnt/data01/dpad -mindepth 1 -name "DIR*" -type d); do

  #recursively iterate through files
  #for those that were modified within the last day (i.e. new files added)
  for f in $(find $d -type f -mtime -1); do

    #determine appropriate folder for file to move to
    newdirname=$(basename $d)
    newfilename=$(basename $f)

    mono convert.exe $f -o /mnt/convertedfiles/$newdirname/$newfilename
  done
done

但是,我想使用我可以访问的处理能力并在多个 CPU 上并行运行它以获得更多的实时转换方法和结果。

我正计划改用 python 并使用 snakemake 来分发命令。

我不是编程新手,但我是 pythonsnakemake 命令的新手。

只是想知道是否有人可以提供一些关于如何开始这个过程的见解?

最佳答案

您可能需要使用 os , threadingmultiprocessing , 和 subprocess模块。

您需要做的第一件事是获取要处理的文件列表。您可以使用 os图书馆。 os.listdir会给你一个目录的内容。 os.walk将递归地遍历目录的内容(和子内容)。

import os
import subprocess
import multiprocessing

filepaths = []
for root, dirnames, filenames in os.walk('/root/path'):
    for filename in filenames:
        if filename.endswith('.jpg'):
            filepaths.append(os.path.join(root, filename))

现在您有了要处理的文件名列表,您需要一个可以并行运行的函数。您还需要构造所有不同的参数集以传递给该函数(在本例中为输出文件名)

arguments = []
for src in filepaths:
    dst = os.path.join('/mnt/convertedfiles', 
                       os.path.dirname(os.path.basename(src)), 
                       os.path.basename(src))
    arguments.append({'src': src, 'dst': dst})

现在我们可以创建一堆进程,每个进程处理一组不同的数据

def func(data)
    p = subprocess.Popen(['mono', 'convert.exe', data['src'], '-o', data['dst'])
    p.wait()

# Using 4 worker processes. If you don't specify a number, it defaults
# to the number of cpu cores you have.
pool = multiprocessing.Pool(4)
pool.map(func, arguments)

关于python - 并行流水线编程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37887993/

相关文章:

python - 如何最佳地合并两个大数据框

python - 使用python在linux中创建一个用户组

c++ - 在两个不同的屏幕上全屏显示两个窗口

linux - 为什么 time.h 中的 time() 没有对 sys_time 的系统调用?

python - 可执行的 Snakefile

python - Django 排除列表中*每个*项目的查询集 __in

python - heroku 上的 django : ImportError: cannot import name get_path_info

python - 创建数组的算法或代码,规则是否符合规定?

conda - 在简单的 snakemake 工作流程中使用 conda 环境

python-3.x - Thread.py 错误蛇形