问题描述
我有几个要在 while 循环中运行的进程。我基本上有一些过程通过使用 do_performance
来计算一些值,随后我希望它们由一些规则(如在 mc_scheduler
方法中)进行操作,直到计算出的值满足 while 循环中的预定义标准.
下面是我的简化代码。有两个 python class
;一个是 multiprocess.Process 的子类(class Worker
),另一个是我的类(class MonteCarlo
)
import itertools
import multiprocessing as mp
import numpy as np
import ctypes
from sympy.combinatorics.permutations import Permutation
from sympy.utilities.iterables import multiset_permutations
from multiprocessing import Pool,Process,Queue,Manager,Value,def do_performance(i,j):
i = np.ravel(i)
j = np.ravel(j)
out = np.dot(i,j)
return out
class Worker(Process):
def __init__(self,MDarr,in_queue,out_queue):
super(Worker,self).__init__()
self.MDarr = MDarr
self.in_queue = in_queue
self.out_queue = out_queue
def mcSimulation(self,replicaData,MDarr):
out = [(i,do_performance(MDarr[0][i,:,:],MDarr[1][i,:])) for i in replicaData]
out.sort(key=lambda x: x[1])
return out[: 1000]
def run(self):
while True:
input_list = self.in_queue.get()
# sleep to allow the other workers a chance (b/c the work action is too simple)
time.sleep(1)
# put the transformed work on the queue and do simulation
self.out_queue.put(self.mcSimulation(input_list,self.MDarr))
class MonteCarlo:
def __init__(self,):
super(MonteCarlo,self).__init__()
self.initialize_simulation()
def initialize_simulation(self):
# define variables
self.n_proc,self.n_replica = 5,50
dt = 4e-12
# generate randomized numpy array for example
self.initIdx,self.endIdx = 135,60918
self.n_traj,self.n_atom = 50000,22
self.MDarr = np.random.normal(size=(self.n_traj,self.n_atom,3))
# generate some of list as input into mp.Manager.Queue()
set_of_list = np.random.choice(np.arange(self.n_traj),self.n_replica,)
self.input_list = list(itertools.islice(
multiset_permutations(set_of_list),1000))
# generate shared_memory array
self.sharedBaseArr = mp.Array(
ctypes.c_double,(2 * self.n_traj * self.n_atom * 3),lock=False)
self.main_NpArray = np.frombuffer(
self.sharedBaseArr,dtype=ctypes.c_double).reshape(2,self.n_traj,3)
np.copyto(self.main_NpArray,self.MDarr)
assert self.main_NpArray.base.base is self.sharedBaseArr,f'shared base array has different shape with main numpy array'
self.replica_manager = mp.Manager()
self.in_queue,self.out_queue = self.replica_manager.Queue(),self.replica_manager.Queue()
return None
def mc_scheduler(self,result):
if self.prevIoUs_result == 0:
self.scheduler_val = 0
self.prevIoUs_result = result
self.min_val = result[0][1]
elif self.prevIoUs_result != 0:
self.optimal = result
self.out = [i for i in result if i[1] < self.min_val]
if len(self.out) == 0:
self.scheduler_val += 1
self.min_val = self.min_val
else:
# reset mc_scheduler
self.scheduler_val = 0
self.min_val = self.out[0][1]
return None
def run(self):
s1time = time.time()
print(f"Start code {datetime.Now()}")
print(f"construct the {self.n_proc} workers (mp.Process)")
print(f"fork and start child process")
workers = [Worker(self.main_NpArray,self.in_queue,self.out_queue) for name in range(self.n_proc)]
[worker.start() for worker in workers]
print("add data to the manager.queue for multi-processes")
[self.in_queue.put(replica_set) for replica_set in self.input_list]
print("update initial results")
self.prevIoUs_result = 0
result = [i for i in self.out_queue.get()]
self.mc_scheduler(result)
while self.scheduler_val < 100:
# From the action value obtained from each process,get Action results from self.out_queue
result = [i for i in self.out_queue.get()]
# compare prevIoUs results
self.mc_scheduler(result)
print(f">> {self.scheduler_val}")
# generate the new input list
set_of_list = np.random.choice(np.arange(self.n_traj),)
new_input_list = list(itertools.islice(multiset_permutations(set_of_list),1000))
[self.in_queue.put(new_input_list) for input_list in new_input_list]
if __name__ == '__main__':
sample_obj = MonteCarlo()
sample_obj.run()
我的问题是
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)