问题描述
我正在尝试使用计算机集群来运行数百万个小型模拟。为此,我尝试在主计算机上设置两个“服务器”,其中一个将输入变量添加到网络的队列中,另一个负责处理结果。
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process,freeze_support,Manager,Value,Queue,current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self,sim_name,queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers,self).__init__()
def get_sim_obj(self,offset,db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0])
self.sim_nr += 1
print(self.sim_nr,round(self.sim_nr/self.total * 100,2),self.queue.qsize())
del sims[0]
def run(self):
self.handle_queue()
if __name__ == '__main__':
freeze_support()
queue = Queue()
w = MultiComputers('seed_1_hundred',queue)
w.start()
QueueManager.register('get_queue',callable=lambda: queue)
m = QueueManager(address=('',8001),authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
然后运行此队列以照顾模拟结果:
__author__ = 'axa'
from multiprocessing import Process,Queue
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
class SaveFromMultiComp(Process):
def __init__(self,queue):
self.sim_name = sim_name
self.queue = queue
super(SaveFromMultiComp,self).__init__()
def run(self):
res_got = 0
with open('sim_type1_' + self.sim_name,'a') as f_1:
with open('sim_type2_' + self.sim_name,'a') as f_2:
while True:
if self.queue.qsize() > 0:
while self.queue.qsize() > 0:
res = self.queue.get()
res_got += 1
if res[0] == 1:
f_1.write(str(res[1]) + '\n')
elif res[0] == 2:
f_2.write(str(res[1]) + '\n')
print(res_got)
time.sleep(0.5)
if __name__ == '__main__':
queue = Queue()
w = SaveFromMultiComp('seed_1_hundred',queue)
w.start()
m = QueueManager(address=('',8002),authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
这些脚本可以正常处理第一个〜7-800模拟,之后在运行接收结果脚本的终端中出现以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:\python35\lib\threading.py",line 914,in _bootstrap_inner
self.run()
File "C:\python35\lib\threading.py",line 862,in run
self._target(*self._args,**self._kwargs)
File "C:\python35\lib\multiprocessing\managers.py",line 177,in accepter
t.start()
File "C:\python35\lib\threading.py",line 844,in start
_start_new_thread(self._bootstrap,())
RuntimeError: can't start new thread
任何人都可以让Som深入了解生成线程的位置和方式,每次我调用queue.get()
时都会生成新线程吗?它是如何工作的?
如果有人知道我可以做些什么来避免这种失败,我将感到非常高兴。 (我正在使用 python3.5-32 运行脚本)
解决方法
所有迹象都表明您的系统没有足够的资源来启动线程(可能是内存,但是您可能正在泄漏线程或其他资源)。您可以使用OS系统监视工具(对于Linux是top
,对于Windows是Resource Monitor
)来查看线程数和内存使用情况以进行追踪,但是我建议您使用更简单,更多的工具。高效的编程模式。
虽然不是一个完美的比较,但您通常会看到C10K problem,它指出等待结果的阻塞线程无法很好地扩展,并且很容易泄漏此类错误。解决方案是实现异步IO模式(一个启动其他工作程序的阻塞线程),这在Web服务器中很简单。
像pythons aiohttp
之类的框架应该非常适合您的需求。您只需要一个可以获取远程代码ID和结果的处理程序。该框架有望为您解决扩展问题。
因此,您可以保留启动代码,但是在启动远程计算机上的进程之后,请终止线程。然后让远程代码将HTTP消息发送给您的服务器,其中包括1)其ID和2)其结果。投入一些额外的代码,要求它再试一次,如果它没有获得200的“正常”状态代码,您的状态应该会好得多。
,我认为您必须为系统运行许多线程。我将首先检查您的系统资源,然后重新考虑我的程序。 尝试限制线程并尽量减少使用。