我的结构(大大简化)如下所示:
import multiprocessing
def creator():
# creates files
return
def relocator():
# moves created files
return
create = multiprocessing.Process(target=creator)
relocate = multiprocessing.Process(target=relocator)
create.start()
relocate.start()
我想做的是让 creator
创建一堆文件,一旦 创建它们,就通过 relocator 将它们移动到另一个目录
。
我想在这里使用multiprocessing
的原因是:
- 我不希望
creator
先等待移动完成,因为移动需要时间,我不想浪费。 - 在开始复制之前先创建所有文件也不是一种选择,因为驱动器中没有足够的空间容纳所有这些文件。
我希望 creator
和 relocator
进程都是串行的(每次一个文件)但并行运行。操作的“日志”应该像这样:
# creating file 1
# creating file 2 and relocating file 1
# creating file 3 and relocating file 2
# ...
# relocating last file
根据我所阅读的内容,Queue
是前往此处的方式。
策略:(也许不是最好的?!)
文件创建后将进入队列,完成重定位后将从队列中删除。
但是我在编码时遇到了问题;同时创建多个文件(creator
的多个实例并行运行)和其他...
如果有任何想法、提示、解释等,我将不胜感激
最佳答案
让我们把你的想法分解成这个特性:
Creator 应该创建文件(例如 100 个)
Relocator 应该一次移动 1 个文件,直到没有更多文件要移动为止
Creator 可能会在 Relocator 之前结束,所以它也可以 将自己变成一个重定位器 双方都必须知道什么时候 完成
因此,我们有 2 个主要功能:
def create(i):
# creates files and return outpath
return os.path.join("some/path/based/on/stuff", "{}.ext".format(i))
def relocate(from, to):
# moves created files
shuttil.move(from, to)
现在让我们创建我们的流程:
from multiprocessing import Process, Queue
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
comm_q.put("STOP_FLAG") # we tell the workers when to stop, we just push one since we only have one more worker
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
creator_process= multiprocessing.Process(target=creator, args=(comm_queue))
relocators = multiprocessing.Process(target=relocator, args=(comm_queue))
creator_process.start()
relocators .start()
这样我们现在就有了一个创建者和一个重定位器,但是,假设现在我们希望 Creator 在创建工作完成后开始重定位,我们可以只使用重定位器,但是我们将需要再推送一个 "STOP_FLAG"
,因为我们将有 2 个进程重新定位
def creator(comm_q):
for i in range(100):
comm_q.put(create(i))
for _ in range(2):
comm_q.put("STOP_FLAG")
relocator(comm_q)
假设我们现在想要任意数量的重定位器进程,我们应该稍微调整我们的代码来处理这个,我们需要 creator
方法来知道有多少标志通知其他处理何时停止,我们的结果代码如下所示:
from multiprocessing import Process, Queue, cpu_count
comm_queue = Queue()
#process that create the files and push the data into the queue
def creator(comm_q, number_of_subprocesses):
for i in range(100):
comm_q.put(create(i))
for _ in range(number_of_subprocesses + 1): # we need to count ourselves
comm_q.put("STOP_FLAG")
relocator(comm_q)
#the relocator works till it gets an stop flag
def relocator(comm_q):
data = comm_q.get()
while data != "STOP_FLAG":
if data:
relocate(data, to_path_you_may_want)
data = comm_q.get()
num_of_cpus = cpu_count() #we will spam as many processes as cpu core we have
creator_process= Process(target=creator, args=(comm_queue, num_of_cpus))
relocators = [Process(target=relocator, args=(comm_queue)) for _ in num_of_cpus]
creator_process.start()
for rp in relocators:
rp.start()
然后你将不得不等待让他们完成:
creator_process.join()
for rp in relocators:
rp.join()
您可能想查看 multiprocessing.Queue
documentation
特别是get
method (默认是阻塞调用)
Remove and return an item from the queue. If optional args block is True (the default) and timeout is None (the default), block if necessary until an item is available.
关于python - 与多处理队列作斗争,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48113927/