Python进程同步

标签 python multithreading synchronization

我正在尝试设置许多“工作”线程/进程,用一个“命令”列表来准备它们,然后让它们一步一步地逐步执行命令。

更新 关于我为什么采用这种方法,我有几个问题,所以这里有一点背景:我正在使用这种方法来编写自动化测试脚本。我正在模拟一个多用户环境,其中不同的用户正在运行一个指向共享资源的应用程序。我想同时对来自多个客户端的 API 执行一系列操作。我希望能够控制每个 worker 做什么来产生一组可重复的测试。如果没有同步,我将无法保证操作按我期望的顺序执行。另一个要求(我可能没有提到)是我希望同时执行这些命令。例如。全部向DB写入大量数据。

我正在使用 multiprocessing Windows 7 上 Python 2.7.5b3 中的模块。到目前为止,我有以下示例正在运行,这说明了我正在尝试做的事情。

此示例让工作人员将结果写回共享队列,因此我可以看到命令的执行顺序。

worker (worker.py):

from multiprocessing import Process, Queue, Event, Lock

class Worker(Process):

    def __init__(self, execute, q_out):
        Process.__init__(self)
        print self.name, 'init'
        self.daemon = True
        self.q_in = Queue()
        self.q_out = q_out
        self.execute = execute

    def run(self):
        print self.name, 'running'
        self.execute.wait()
        while not self.q_in.empty():
            cmd = self.q_in.get()
            self.q_out.put((self.name, cmd))

管理者:
from multiprocessing import Event, Queue
from worker import Worker

if __name__ == '__main__':    

    workers = []
    syncEvent = Event()
    shared_q = Queue()
    for i in range(0,2):
        worker = Worker(syncEvent, shared_q)
        map(worker.q_in.put, ['A', 'B', 'C'])
        workers.append(worker)
        worker.start()

    syncEvent.set()

    for w in workers:
        w.join()

    while not shared_q.empty():
        print shared_q.get()

这给了我如下输出:
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-1', 'B')
('Worker-1', 'C')
('Worker-2', 'A')
('Worker-2', 'B')
('Worker-2', 'C')

我想要实现的是这个输出:
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-2', 'A')
('Worker-1', 'B')
('Worker-2', 'B')
('Worker-1', 'C')
('Worker-2', 'C')

我已锁定LockRLock ,但这似乎不符合要求,因为我试图让所有线程同时运行,但只需停止并等待其他线程全部完成,然后再执行下一个命令。

我确信有一种很好且简单的方法可以做到这一点,但我无法完全理解它是什么。有人对如何进行有任何建议吗?

最佳答案

这个类应该用来同步进程。它基本上将所有进程保持在线程条件下,当最后一个工作人员完成时,它会通知所有其他被唤醒并能够继续的进程

worker .py

from multiprocessing import Process, Queue, Event, Lock

class Worker(Process):

    def __init__(self, execute, q_out, syncher):
        Process.__init__(self)
        print self.name, 'init'
        self.daemon = True
        self.q_in = Queue()
        self.q_out = q_out
        self.execute = execute
        self.syncher = syncher

    def run(self):
        print self.name, 'running'
        self.execute.wait()
        while not self.q_in.empty():
            self.syncher.check()
            cmd = self.q_in.get()
            self.q_out.put((self.name, cmd))

管理器.py
from multiprocessing import Event, Queue, Condition, Lock, Value
from worker import Worker

class Synchroniser(object):
    def __init__(self, workers):
        self.workers_locked = Value('i', 0)
        self.workers = workers
        self.condition = Condition(Lock())

    def check(self):
        with self.condition:
            self.workers_locked.value += 1
            if self.workers_locked.value >= self.workers:
                self.condition.notify_all()
            else:
                self.condition.wait()
            self.workers_locked.value -= 1

if __name__ == '__main__':

    workers = []
    syncEvent = Event()
    shared_q = Queue()
    worker_num = 2
    syncher = Synchroniser(worker_num)
    for i in range(0,worker_num):
        worker = Worker(syncEvent, shared_q, syncher)
        map(worker.q_in.put, ['A', 'B', 'C'])
        workers.append(worker)
        worker.start()

    syncEvent.set()

    for w in workers:
        w.join()

    while not shared_q.empty():
        print shared_q.get()
> python manager.py 
Worker-1 init
Worker-2 init
Worker-1 running
Worker-2 running
('Worker-1', 'A')
('Worker-2', 'A')
('Worker-2', 'B')
('Worker-1', 'B')
('Worker-1', 'C')
('Worker-2', 'C')

关于Python进程同步,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26805893/

相关文章:

java - 多线程服务器如何工作?

java - 解释使用迭代器时集合的同步?

python - Peewee 按需添加列

java - DateFormat 或 Calender.getInstance 有时会返回随机值

python 项目 euler 6 任意数字

c++ - openmp中的并行for循环

multithreading - ConcurrentUpdateSolrClient 如何处理更新请求?

Java + Eclipse : Synchronize stdout and stderr

python .rstrip 删除一个额外的字符

php - 将街道名称与街道号码分开