问题描述
下午好,
我正在尝试并行化线性规划求解方案,下面部分复制了代码。求解方法利用 PuLP 库,该库使用子进程运行求解器 命令。
from collections import OrderedDict
from time import time
from multiprocessing import Queue,Process
from queue import Empty
from os import getpid,path,mkdir
import sys
SOLVER = None
NUMBER_OF_PROCESSES = 12
# other parameters
def choose_solver():
"""Choose an initial solver"""
if SOLVER == "CHOCO":
solver = plp.PULP_CHOCO_CMD()
elif SOLVER == "GLPK":
solver = plp.GLPK_CMD(msg=0)
elif SOLVER == "GUROBI":
solver = plp.GUROBI_CMD(msg=0)
else:
solver = plp.PULP_CBC_CMD(msg=0)
return solver
# other functions that are not multiprocess relevant
def is_infeasible(status):
"""Wrapper around PulP infeasible status"""
return status in (plp.LpStatusInfeasible,plp.LpStatusUndefined)
def feasible_problems(input_var,output_var,initial_problem,solver):
"""Perform LP solving on a initial
problem,return the feasible ones"""
input_gt = input_var - TOL >= 0
input_lt = input_var + TOL <= 0
output_eq_input = (output_var - input_var == 0)
output_eq_zero = (output_var == 0)
problem_a = initial_problem.deepcopy()
problem_a += input_gt
problem_a += output_eq_input
problem_b = initial_problem.deepcopy()
problem_b += input_lt
problem_b += output_eq_zero
problem_a.solve(solver)
problem_b.solve(solver)
status_act = problem_a.status
status_inact = problem_b.status
if is_infeasible(status_act):
return (problem_b,)
else:
if is_infeasible(status_inact):
return (problem_a,)
else:
return (problem_a,problem_b)
def worker(q,r,start_problem,start_idx,to_check):
"""Worker spawned in a new process.
Iterates over the neuron expression list.
Sends a new job to the tasks queue if two activations are available.
"""
problem = start_problem
solver = choose_solver()
for idx in range(start_idx,len(to_check) + 1):
if idx == len(to_check):
r.put_Nowait(problem)
else:
output_var,input_var = to_check[idx]
pbs = feasible_problems(input_var,problem,solver)
if len(pbs) == 1:
problem = pbs[0]
elif len(pbs) == 2:
q.put_Nowait((idx+1,pbs[0]))
problem = pbs[1]
def overseer(init_prob,neuron_exprs):
"""Running in the initial process,this function create tasks and results queues,maintain the number of current running processes
and spawn new processes when there is enough resources
for them to run.
"""
tasks = Queue()
results = Queue()
working_processes = {}
init_p = Process(target=worker,args=(tasks,results,init_prob,neuron_exprs))
init_p.start()
working_processes[init_p.pid] = init_p
res_list = []
while len(working_processes) > 0:
if len(working_processes) <= NUMBER_OF_PROCESSES:
# if there is enough room in the working queue,# spawn a new process and add it
try:
(idx,problem) = tasks.get(timeout=1)
except Empty:
break
proc = Process(target=worker,idx,neuron_exprs))
proc.start()
working_processes[proc.pid] = proc
to_del = []
for pid in working_processes:
pwork = working_processes[pid]
pwork.join(timeout=0)
if pwork.exitcode is not None:
to_del.append(pid)
for pid in to_del:
#deleting working process
del working_processes[pid]
results.join_thread()
for i in range(results.qsize()):
elt = results.get()
res_list.append(elt)
return res_list
def test_multi(init_prob,neuron_exprs):
print("Testing multi process mode")
Now = time()
init_prob,exprs = #some function that calculate those
res = overseer(init_prob,exprs)
print("Time spent: {:.4f}s".format(time()-Now))
for idx,problem in enumerate(res):
if not path.exists("results"):
mkdir("results")
problem.writeLP("results/"+str(idx))
if __name__ == '__main__':
torch_model = read_model(MODEL_PATH)
print("Number of neurons: ",count_neurons(torch_model))
print("Expected number of facets: ",theoretical_number(torch_model,DIM_INPUT))
prob,to_check,hp,var_dict = init_problem(torch_model)
test_multi(prob,to_check)
在我的 worker
中,我执行了一些可能导致两个不同问题的代价高昂的计算;
如果发生这种情况,我会将一个问题发送到任务队列,同时为当前工作进程保留另一个问题。我的监督者接受队列中的一项任务,并在可能的情况下启动一个进程。
to_check
是一个 PuLP 表达式列表,
我想要做的是用实际运行的进程填充 working_processes
字典,然后在每次迭代时查找它们的结果并删除那些已完成的进程。预期的行为是在旧进程终止时继续产生新进程,但情况似乎并非如此。然而,在这里我无限期地挂起:我成功地处理了队列中的任务,但是当我产生超过 NUMBER_OF_PROCESSES
时我的程序挂起。
我对多处理很陌生,所以我的处理方式可能有问题。有人知道吗?
解决方法
看看来自 concurrent.futures
的 ProcessPoolExecutor
。
Executor 对象允许您指定具有上限大小的工作线程池。您可以同时提交所有作业,执行者会在它们中运行,在旧作业完成时挑选新作业。