我正在尝试设置许多“工作”线程/进程,用一个“命令”列表来准备它们,然后让它们一步一步地逐步执行命令。
更新 关于我为什么采用这种方法,我有几个问题,所以这里有一点背景:我正在使用这种方法来编写自动化测试脚本。我正在模拟一个多用户环境,其中不同的用户正在运行一个指向共享资源的应用程序。我想同时对来自多个客户端的 API 执行一系列操作。我希望能够控制每个 worker 做什么来产生一组可重复的测试。如果没有同步,我将无法保证操作按我期望的顺序执行。另一个要求(我可能没有提到)是我希望同时执行这些命令。例如。全部向DB写入大量数据。
我正在使用 multiprocessing
Windows 7 上 Python 2.7.5b3 中的模块。到目前为止,我有以下示例正在运行,这说明了我正在尝试做的事情。
此示例让工作人员将结果写回共享队列,因此我可以看到命令的执行顺序。
worker (worker.py):
from multiprocessing import Process, Queue, Event, Lock
class Worker(Process):
def __init__(self, execute, q_out):
Process.__init__(self)
print self.name, 'init'
self.daemon = True
self.q_in = Queue()
self.q_out = q_out
self.execute = execute
def run(self):
print self.name, 'running'
self.execute.wait()
while not self.q_in.empty():
cmd = self.q_in.get()
self.q_out.put((self.name, cmd))
管理者:
from multiprocessing import Event, Queue
from worker import Worker
if __name__ == '__main__':
workers = []
syncEvent = Event()
shared_q = Queue()
for i in range(0,2):
worker = Worker(syncEvent, shared_q)
map(worker.q_in.put, ['A', 'B', 'C'])
workers.append(worker)
worker.start()
syncEvent.set()
for w in workers:
w.join()
while not shared_q.empty():
print shared_q.get()
这给了我如下输出:
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-1', 'B')
('Worker-1', 'C')
('Worker-2', 'A')
('Worker-2', 'B')
('Worker-2', 'C')
我想要实现的是这个输出:
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-2', 'A')
('Worker-1', 'B')
('Worker-2', 'B')
('Worker-1', 'C')
('Worker-2', 'C')
我已锁定
Lock
和 RLock
,但这似乎不符合要求,因为我试图让所有线程同时运行,但只需停止并等待其他线程全部完成,然后再执行下一个命令。我确信有一种很好且简单的方法可以做到这一点,但我无法完全理解它是什么。有人对如何进行有任何建议吗?
最佳答案
这个类应该用来同步进程。它基本上将所有进程保持在线程条件下,当最后一个工作人员完成时,它会通知所有其他被唤醒并能够继续的进程
worker .py
from multiprocessing import Process, Queue, Event, Lock
class Worker(Process):
def __init__(self, execute, q_out, syncher):
Process.__init__(self)
print self.name, 'init'
self.daemon = True
self.q_in = Queue()
self.q_out = q_out
self.execute = execute
self.syncher = syncher
def run(self):
print self.name, 'running'
self.execute.wait()
while not self.q_in.empty():
self.syncher.check()
cmd = self.q_in.get()
self.q_out.put((self.name, cmd))
管理器.py
from multiprocessing import Event, Queue, Condition, Lock, Value
from worker import Worker
class Synchroniser(object):
def __init__(self, workers):
self.workers_locked = Value('i', 0)
self.workers = workers
self.condition = Condition(Lock())
def check(self):
with self.condition:
self.workers_locked.value += 1
if self.workers_locked.value >= self.workers:
self.condition.notify_all()
else:
self.condition.wait()
self.workers_locked.value -= 1
if __name__ == '__main__':
workers = []
syncEvent = Event()
shared_q = Queue()
worker_num = 2
syncher = Synchroniser(worker_num)
for i in range(0,worker_num):
worker = Worker(syncEvent, shared_q, syncher)
map(worker.q_in.put, ['A', 'B', 'C'])
workers.append(worker)
worker.start()
syncEvent.set()
for w in workers:
w.join()
while not shared_q.empty():
print shared_q.get()
> python manager.py
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-2', 'A')
('Worker-2', 'B')
('Worker-1', 'B')
('Worker-1', 'C')
('Worker-2', 'C')
关于Python进程同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26805893/