如何从多处理中提取返回的“对象”?

问题描述

我需要对numpy数组进行多重处理。 (我只是无法使用numba加快功能) 它以数组作为输入,并将数据馈送到multiprocessing.pool上以在其上执行功能funcfunc进行一些操作并返回子数组,该子数组及其编号位置在原始数组中堆叠(=及其位置)。 池函数在可迭代对象func中返回经过多处理处理的函数out的结果。 如何将可迭代对象转换回numpy数组?

可复制的代码

import numpy as np,time,multiprocessing as mp,pandas as pd; min_ = 0.7

def sub_sub_func(X,newmin,newmax):  
    if len(X) >1: 
        if  (X[0] == X[1:]).all():       X.fill(newmax)
        else: 
            X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
            return X_std * (newmax - newmin)+newmin    
    elif len(X) ==1:    
        X[0] = newmax    
    return X 

def sub_func(arr):   
    if np.min(arr)<=0:       
        arr[arr<=0] = sub_sub_func(arr[arr<=0],min_*0.65,min_)  
    elif np.min(arr)<min_:    
        arr[arr<=min_] = sub_sub_func(arr[arr<=min_],min_*0.80,min_)  
    return arr  

def func(mol_subrange,arr):
    result= np.array([slice_+slice for slice_ in  arr[mol_subrange] ]) 
    return np.column_stack((numberlocations,result)) # return it with its numberlocations

def mp_list_o_arr_comprehension(func,full_arr_to_process,numThreads=4,mpBatches=1):  
    molecule_subrange = np.array(range(len(full_arr_to_process))),parts             = linParts(len(molecule_subrange),numThreads*mpBatches) 
    
    jobs=[]
    for i in range(1,len(parts)):
        job={'mol_subrange':molecule_subrange[parts[i-1]:parts[i]],'arr':         full_arr_to_process[parts[i-1]:parts[i]],'func':func}
        jobs.append(job)
       
    pool=mp.Pool(processes=numThreads) 
    outputs=pool.imap_unordered(expandCall,jobs) 
    
    out_list = []
    for out_ in outputs:
        out_list.append(out_.get())
    pool.close(); pool.join() # this is needed to prevent memory leaks return out 
    
    locs_arr,out_arr = np.array([]),np.array([])  
    for out_ in out_list: 
        out_locs = np.asarray(out_)[:,0]
        out_vals = np.asarray(out_)[:,1]
        
        out_arr  = np.concatenate((out_arr,out_vals))
        locs_arr = np.concatenate((locs_arr,out_locs))
           
    #sort order by converting it into a pandas series
    result = pd.series(out_arr,index=locs_arr).sort_index()  
    
    return np.array(result)

def linParts(numAtoms,numThreads):
    # partition of atoms with a single loop
    parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
    parts=np.ceil(parts).astype(int)
    return parts

def expandCall(kargs): 
    # Expand the arguments of a callback function,kargs[’func’] 
    func=kargs['func'] 
    del kargs['func']  
    out=func(**kargs)  
    return out 

if __name__=='__main__': 
    LEN = 10000; temp = np.random.randint(1,high=100,size=LEN) 
    a  = [np.random.uniform(size=rand) for rand in temp]  
    result = mp_list_o_arr_comprehension(func,a,mpBatches=10) 

它会产生此错误

RemoteTraceback: 
"""
Traceback (most recent call last):
  File "c:\...\pool.py",line 125,in worker
    result = (True,func(*args,**kwds))
  File "D:\... .py",line 67,in expandCall
    out=func(**kargs)
  File "D:\... .py",line 22,in func
    result= np.array([slice_+slice for slice_ in  arr[mol_subrange] ])
TypeError: list indices must be integers or slices,not tuple
"""


The above exception was the direct cause of the following exception:

Traceback (most recent call last):

  File "D:\... .py",line 73,in <module>
    result = mp_list_o_arr_comprehension(func,mpBatches=10)

  File "D:\... ",line 40,in mp_list_o_arr_comprehension
    for out_ in outputs:

  File "c:\...\pool.py",line 868,in next
    raise value

TypeError: list indices must be integers or slices,not tuple

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)