问题描述
我需要对numpy数组进行多重处理。 (我只是无法使用numba加快功能)
它以数组作为输入,并将数据馈送到multiprocessing.pool
上以在其上执行功能func
。
func
进行一些操作并返回子数组,该子数组及其编号位置在原始数组中堆叠(=及其位置)。
池函数在可迭代对象func
中返回经过多处理处理的函数out
的结果。
如何将可迭代对象转换回numpy数组?
可复制的代码:
import numpy as np,time,multiprocessing as mp,pandas as pd; min_ = 0.7
def sub_sub_func(X,newmin,newmax):
if len(X) >1:
if (X[0] == X[1:]).all(): X.fill(newmax)
else:
X_std = (X - X.min(axis=0)) / (X.max(axis=0) - X.min(axis=0))
return X_std * (newmax - newmin)+newmin
elif len(X) ==1:
X[0] = newmax
return X
def sub_func(arr):
if np.min(arr)<=0:
arr[arr<=0] = sub_sub_func(arr[arr<=0],min_*0.65,min_)
elif np.min(arr)<min_:
arr[arr<=min_] = sub_sub_func(arr[arr<=min_],min_*0.80,min_)
return arr
def func(mol_subrange,arr):
result= np.array([slice_+slice for slice_ in arr[mol_subrange] ])
return np.column_stack((numberlocations,result)) # return it with its numberlocations
def mp_list_o_arr_comprehension(func,full_arr_to_process,numThreads=4,mpBatches=1):
molecule_subrange = np.array(range(len(full_arr_to_process))),parts = linParts(len(molecule_subrange),numThreads*mpBatches)
jobs=[]
for i in range(1,len(parts)):
job={'mol_subrange':molecule_subrange[parts[i-1]:parts[i]],'arr': full_arr_to_process[parts[i-1]:parts[i]],'func':func}
jobs.append(job)
pool=mp.Pool(processes=numThreads)
outputs=pool.imap_unordered(expandCall,jobs)
out_list = []
for out_ in outputs:
out_list.append(out_.get())
pool.close(); pool.join() # this is needed to prevent memory leaks return out
locs_arr,out_arr = np.array([]),np.array([])
for out_ in out_list:
out_locs = np.asarray(out_)[:,0]
out_vals = np.asarray(out_)[:,1]
out_arr = np.concatenate((out_arr,out_vals))
locs_arr = np.concatenate((locs_arr,out_locs))
#sort order by converting it into a pandas series
result = pd.series(out_arr,index=locs_arr).sort_index()
return np.array(result)
def linParts(numAtoms,numThreads):
# partition of atoms with a single loop
parts=np.linspace(0,numAtoms,min(numThreads,numAtoms)+1)
parts=np.ceil(parts).astype(int)
return parts
def expandCall(kargs):
# Expand the arguments of a callback function,kargs[’func’]
func=kargs['func']
del kargs['func']
out=func(**kargs)
return out
if __name__=='__main__':
LEN = 10000; temp = np.random.randint(1,high=100,size=LEN)
a = [np.random.uniform(size=rand) for rand in temp]
result = mp_list_o_arr_comprehension(func,a,mpBatches=10)
它会产生此错误:
RemoteTraceback:
"""
Traceback (most recent call last):
File "c:\...\pool.py",line 125,in worker
result = (True,func(*args,**kwds))
File "D:\... .py",line 67,in expandCall
out=func(**kargs)
File "D:\... .py",line 22,in func
result= np.array([slice_+slice for slice_ in arr[mol_subrange] ])
TypeError: list indices must be integers or slices,not tuple
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "D:\... .py",line 73,in <module>
result = mp_list_o_arr_comprehension(func,mpBatches=10)
File "D:\... ",line 40,in mp_list_o_arr_comprehension
for out_ in outputs:
File "c:\...\pool.py",line 868,in next
raise value
TypeError: list indices must be integers or slices,not tuple
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)