python - 与多处理队列作斗争

标签 python python-3.x parallel-processing multiprocessing

我的结构(大大简化)如下所示:

import multiprocessing

def creator():
    # creates files
    return


def relocator():
    # moves created files
    return


create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()

我想做的是让 creator 创建一堆文件,一旦 创建它们,就通过 relocator 将它们移动到另一个目录

我想在这里使用multiprocessing的原因是:

  • 我不希望 creator 先等待移动完成,因为移动需要时间,我不想浪费。
  • 在开始复制之前先创建所有文件也不是一种选择,因为驱动器中没有足够的空间容纳所有这些文件。

我希望 creatorrelocator 进程都是串行的(每次一个文件)但并行运行。操作的“日志”应该像这样:

# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file

根据我所阅读的内容,Queue 是前往此处的方式。

策略:(也许不是最好的?!)

文件创建后将进入队列,完成重定位后将从队列中删除。

但是我在编码时遇到了问题;同时创建多个文件(creator 的多个实例并行运行)和其他...

如果有任何想法、提示、解释等,我将不胜感激

最佳答案

让我们把你的想法分解成这个特性:

  1. Creator 应该创建文件(例如 100 个)

  2. Relocator 应该一次移动 1 个文件,直到没有更多文件要移动为止

  3. Creator 可能会在 Relocator 之前结束,所以它也可以 将自己变成一个重定位器 双方都必须知道什么时候 完成

因此,我们有 2 个主要功能:

def create(i):
    # creates files and return outpath
    return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))


def relocate(from, to):
    # moves created files
    shuttil.move(from, to)

现在让我们创建我们的流程:

from multiprocessing import Process, Queue

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()

这样我们现在就有了一个创建者和一个重定位器,但是,假设现在我们希望 Creator 在创建工作完成后开始重定位,我们可以只使用重定位器,但是我们将需要再推送一个 "STOP_FLAG",因为我们将有 2 个进程重新定位

def creator(comm_q):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(2):
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

假设我们现在想要任意数量的重定位器进程,我们应该稍微调整我们的代码来处理这个,我们需要 creator 方法来知道有多少标志通知其他处理何时停止,我们的结果代码如下所示:

from multiprocessing import Process, Queue, cpu_count

comm_queue = Queue()

#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
    for i in range(100):
        comm_q.put(create(i))
    for _ in range(number_of_subprocesses + 1): # we need to count ourselves
        comm_q.put("STOP_FLAG")
    relocator(comm_q)

#the relocator works till it gets an stop flag
def relocator(comm_q):
    data = comm_q.get()
    while data != "STOP_FLAG":
        if data:
            relocate(data, to_path_you_may_want)
        data = comm_q.get()

num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
    rp.start()

然后你将不得不等待让他们完成:

creator_process.join()
for rp in relocators:
    rp.join()

您可能想查看 multiprocessing.Queue documentation

特别是get method (默认是阻塞调用)

Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available.

关于python - 与多处理队列作斗争,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48113927/

相关文章:

python - ssh 到服务器并使用 Python 执行等效的curl

vb.net - 使用 VB.Net Parallel.ForEach 和 ConcurrentDictionary 的正确语法是什么?

logging - 并行 MATLAB 和日志记录

python - 在 python 中将字符串转换为数组的最快方法是什么?

python - python数据集上的Groupby函数

python - Python 中的优化技术

python - 如何在命令行中显示 Markdown 文本?

python - 使用python的multiprocessing和process defunc进行并行编程

python - 将纯 python 部署到 heroku

python - 如何在Window OS上安装conda-pack环境