子进程挂起,阻止主进程终止

问题描述

下午好,

我正在尝试并行化线性规划求解方案,下面部分复制了代码。求解方法利用 PuLP 库,该库使用子进程运行求解器 命令。

from collections import OrderedDict
from time import time
from multiprocessing import Queue,Process
from queue import Empty
from os import getpid,path,mkdir
import sys

SOLVER = None
NUMBER_OF_PROCESSES = 12
# other parameters

def choose_solver():
    """Choose an initial solver"""
    if SOLVER == "CHOCO":
        solver = plp.PULP_CHOCO_CMD()
    elif SOLVER == "GLPK":
        solver = plp.GLPK_CMD(msg=0)
    elif SOLVER == "GUROBI":
        solver = plp.GUROBI_CMD(msg=0)
    else:
        solver = plp.PULP_CBC_CMD(msg=0)
    return solver
# other functions that are not multiprocess relevant

def is_infeasible(status):
    """Wrapper around PulP infeasible status"""
    return status in (plp.LpStatusInfeasible,plp.LpStatusUndefined)

def feasible_problems(input_var,output_var,initial_problem,solver):
    """Perform LP solving on a initial
    problem,return the feasible ones"""
    input_gt = input_var - TOL >= 0
    input_lt = input_var + TOL <= 0
    output_eq_input = (output_var - input_var == 0)
    output_eq_zero = (output_var == 0)
    problem_a = initial_problem.deepcopy()
    problem_a += input_gt
    problem_a += output_eq_input
    problem_b = initial_problem.deepcopy()
    problem_b += input_lt
    problem_b += output_eq_zero
    problem_a.solve(solver)
    problem_b.solve(solver)
    status_act = problem_a.status
    status_inact = problem_b.status
    if is_infeasible(status_act):
        return (problem_b,)
    else:
        if is_infeasible(status_inact):
            return (problem_a,)
        else:
            return (problem_a,problem_b)

def worker(q,r,start_problem,start_idx,to_check):
    """Worker spawned in a new process.
    Iterates over the neuron expression list.
    Sends a new job to the tasks queue if two activations are available.
    """
    problem = start_problem
    solver = choose_solver()
    for idx in range(start_idx,len(to_check) + 1):
        if idx == len(to_check):
            r.put_Nowait(problem)
        else:
            output_var,input_var = to_check[idx]
            pbs = feasible_problems(input_var,problem,solver)
            if len(pbs) == 1:
                problem = pbs[0]
            elif len(pbs) == 2:
                q.put_Nowait((idx+1,pbs[0]))
                problem = pbs[1]

def overseer(init_prob,neuron_exprs):
    """Running in the initial process,this function create tasks and results queues,maintain the number of current running processes
    and spawn new processes when there is enough resources
    for them to run.
    """
    tasks = Queue()
    results = Queue()
    working_processes = {}
    init_p = Process(target=worker,args=(tasks,results,init_prob,neuron_exprs))
    init_p.start()
    working_processes[init_p.pid] = init_p
    res_list = []
    while len(working_processes) > 0:
        if len(working_processes) <= NUMBER_OF_PROCESSES:
            # if there is enough room in the working queue,# spawn a new process and add it
            try:
                (idx,problem) = tasks.get(timeout=1)
            except Empty:
                break
            proc = Process(target=worker,idx,neuron_exprs))
            proc.start()
            working_processes[proc.pid] = proc
        to_del = []
        for pid in working_processes:
            pwork = working_processes[pid]
            pwork.join(timeout=0)
            if pwork.exitcode is not None:
                to_del.append(pid)
        for pid in to_del:
            #deleting working process
            del working_processes[pid]
    results.join_thread()
    for i in range(results.qsize()):
        elt = results.get()
        res_list.append(elt)
    return res_list

def test_multi(init_prob,neuron_exprs):
    print("Testing multi process mode")
    Now = time()
    init_prob,exprs = #some function that calculate those
    res = overseer(init_prob,exprs)
    print("Time spent: {:.4f}s".format(time()-Now))
    for idx,problem in enumerate(res):
        if not path.exists("results"):
            mkdir("results")
        problem.writeLP("results/"+str(idx))

if __name__ == '__main__':
    torch_model = read_model(MODEL_PATH)
    print("Number of neurons: ",count_neurons(torch_model))
    print("Expected number of facets: ",theoretical_number(torch_model,DIM_INPUT))
    prob,to_check,hp,var_dict = init_problem(torch_model)
    test_multi(prob,to_check)

在我的 worker 中,我执行了一些可能导致两个不同问题的代价高昂的计算; 如果发生这种情况,我会将一个问题发送到任务队列,同时为当前工作进程保留另一个问题。我的监督者接受队列中的一项任务,并在可能的情况下启动一个进程。

to_check一个 PuLP 表达式列表,

我想要做的是用实际运行的进程填充 working_processes 字典,然后在每次迭代时查找它们的结果并删除那些已完成的进程。预期的行为是在旧进程终止时继续产生新进程,但情况似乎并非如此。然而,在这里我无限期地挂起:我成功地处理了队列中的任务,但是当我产生超过 NUMBER_OF_PROCESSES 时我的程序挂起。

我对多处理很陌生,所以我的处理方式可能有问题。有人知道吗?

解决方法

看看来自 concurrent.futuresProcessPoolExecutor

Executor 对象允许您指定具有上限大小的工作线程池。您可以同时提交所有作业,执行者会在它们中运行,在旧作业完成时挑选新作业。