我正在尝试使用一组计算机来运行数百万个小型模拟。为此,我尝试在主计算机上设置两台“服务器”,一台用于将队列中的输入变量添加到网络,另一台负责处理结果。
这是将东西放入模拟变量队列的代码:
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers, self).__init__()
def get_sim_obj(self, offset, db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0])
self.sim_nr += 1
print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
del sims[0]
def run(self):
self.handle_queue()
if __name__ == '__main__':
freeze_support()
queue = Queue()
w = MultiComputers('seed_1_hundred', queue)
w.start()
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 8001), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
然后这个队列运行来处理模拟的结果:__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
class SaveFromMultiComp(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(SaveFromMultiComp, self).__init__()
def run(self):
res_got = 0
with open('sim_type1_' + self.sim_name, 'a') as f_1:
with open('sim_type2_' + self.sim_name, 'a') as f_2:
while True:
if self.queue.qsize() > 0:
while self.queue.qsize() > 0:
res = self.queue.get()
res_got += 1
if res[0] == 1:
f_1.write(str(res[1]) + '\n')
elif res[0] == 2:
f_2.write(str(res[1]) + '\n')
print(res_got)
time.sleep(0.5)
if __name__ == '__main__':
queue = Queue()
w = SaveFromMultiComp('seed_1_hundred', queue)
w.start()
m = QueueManager(address=('', 8002), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
这些脚本按预期工作以处理第一个 ~7-800 模拟,之后我在运行接收结果脚本的终端中收到以下错误:Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\Python35\lib\threading.py", line 914, in _bootstrap_inner
self.run()
File "C:\Python35\lib\threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "C:\Python35\lib\multiprocessing\managers.py", line 177, in accepter
t.start()
File "C:\Python35\lib\threading.py", line 844, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
任何人都可以对线程产生的位置和方式提供一些见解,每次我打电话时都会产生一个新线程queue.get()
或者它是如何工作的?如果有人知道我可以做些什么来避免这种失败,我会很高兴? (我正在使用 Python3.5-32 运行脚本)
最佳答案
所有迹象都表明您的系统没有启动线程所需的资源(可能是内存,但您可能正在泄漏线程或其他资源)。您可以使用操作系统系统监控工具(Linux 为 top
,Windows 为 Resource Monitor
)查看线程数和内存使用情况以进行跟踪,但我建议您使用更简单、更高效的编程模式。
虽然不是一个完美的比较,但您通常会看到 C10K problem并且它指出等待结果的阻塞线程不能很好地扩展并且可能容易泄漏这样的错误。解决方案是实现异步 IO 模式(一个启动其他工作线程的阻塞线程),这在 Web 服务器中非常简单。
像pythons这样的框架aiohttp
应该很适合你想要的。您只需要一个可以获取远程代码 ID 和结果的处理程序。该框架应该有望为您处理缩放。
因此,在您的情况下,您可以保留启动代码,但是在远程机器上启动进程后,终止线程。然后让远程代码向您的服务器发送一条 HTTP 消息,其中包含 1) 其 ID 和 2) 其结果。如果它没有得到 200 'OK' 状态代码,请输入一些额外的代码来要求它再试一次,你应该处于更好的状态。
关于Python无法启动新线程多处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63532917/