Python无法启动新的线程多处理

问题描述

我正在尝试使用计算机集群来运行数百万个小型模拟。为此,我尝试在主计算机上设置两个“服务器”,其中一个将输入变量添加到网络的队列中,另一个负责处理结果。

这是将内容放入仿真变量队列的代码

"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process,freeze_support,Manager,Value,Queue,current_process
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


class MultiComputers(Process):
    def __init__(self,sim_name,queue):
        self.sim_name = sim_name
        self.queue = queue
        super(MultiComputers,self).__init__()

    def get_sim_obj(self,offset,db):
        """returns a list of lists from a database query"""

    def handle_queue(self):
        self.sim_nr = 0
        sims = self.get_sim_obj()
        self.total = len(sims)
        while len(sims) > 0:
            if self.queue.qsize() > 100:
                self.queue.put(sims[0])
                self.sim_nr += 1
                print(self.sim_nr,round(self.sim_nr/self.total * 100,2),self.queue.qsize())
                del sims[0]

    def run(self):
        self.handle_queue()

if __name__ == '__main__':
    freeze_support()
    queue = Queue()
    w = MultiComputers('seed_1_hundred',queue)
    w.start()
    QueueManager.register('get_queue',callable=lambda: queue)
    m = QueueManager(address=('',8001),authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

然后运行此队列以照顾模拟结果:

__author__ = 'axa'
from multiprocessing import Process,Queue
from multiprocessing.managers import BaseManager
import time


class QueueManager(BaseManager):
    pass


class SaveFromMultiComp(Process):
    def __init__(self,queue):
        self.sim_name = sim_name
        self.queue = queue
        super(SaveFromMultiComp,self).__init__()

    def run(self):
        res_got = 0
        with open('sim_type1_' + self.sim_name,'a') as f_1:
            with open('sim_type2_' + self.sim_name,'a') as f_2:
                while True:
                    if self.queue.qsize() > 0:
                        while self.queue.qsize() > 0:
                            res = self.queue.get()
                            res_got += 1
                            if res[0] == 1:
                                f_1.write(str(res[1]) + '\n')
                            elif res[0] == 2:
                                f_2.write(str(res[1]) + '\n')
                            print(res_got)
                    time.sleep(0.5)


if __name__ == '__main__':
    queue = Queue()
    w = SaveFromMultiComp('seed_1_hundred',queue)
    w.start()
    m = QueueManager(address=('',8002),authkey=b'abracadabra')
    s = m.get_server()
    s.serve_forever()

这些脚本可以正常处理第一个〜7-800模拟,之后在运行接收结果脚本的终端中出现以下错误

Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\python35\lib\threading.py",line 914,in _bootstrap_inner
    self.run()
  File "C:\python35\lib\threading.py",line 862,in run
    self._target(*self._args,**self._kwargs)
  File "C:\python35\lib\multiprocessing\managers.py",line 177,in accepter
    t.start()
  File "C:\python35\lib\threading.py",line 844,in start
    _start_new_thread(self._bootstrap,())
RuntimeError: can't start new thread

任何人都可以让Som深入了解生成线程的位置和方式,每次我调用queue.get()时都会生成新线程吗?它是如何工作的? 如果有人知道我可以做些什么来避免这种失败,我将感到非常高兴。 (我正在使用 python3.5-32 运行脚本)

解决方法

所有迹象都表明您的系统没有足够的资源来启动线程(可能是内存,但是您可能正在泄漏线程或其他资源)。您可以使用OS系统监视工具(对于Linux是top,对于Windows是Resource Monitor)来查看线程数和内存使用情况以进行追踪,但是我建议您使用更简单,更多的工具。高效的编程模式。

虽然不是一个完美的比较,但您通常会看到C10K problem,它指出等待结果的阻塞线程无法很好地扩展,并且很容易泄漏此类错误。解决方案是实现异步IO模式(一个启动其他工作程序的阻塞线程),这在Web服务器中很简单。

像pythons aiohttp之类的框架应该非常适合您的需求。您只需要一个可以获取远程代码ID和结果的处理程序。该框架有望为您解决扩展问题。

因此,您可以保留启动代码,但是在启动远程计算机上的进程之后,请终止线程。然后让远程代码将HTTP消息发送给您的服务器,其中包括1)其ID和2)其结果。投入一些额外的代码,要求它再试一次,如果它没有获得200的“正常”状态代码,您的状态应该会好得多。

,

我认为您必须为系统运行许多线程。我将首先检查您的系统资源,然后重新考虑我的程序。 尝试限制线程并尽量减少使用。