问题描述
Numpy的范围仅接受用于开始/停止/步进的单个标量值。此功能是否有多种版本?哪个可以接受数组输入作为开始/停止/步进?例如。输入二维数组,如:
[[1 5 1],# start/stop/step first
[3 8 2]] # start/stop/step second
应为输入的每一行(每个开始/停止/步骤)创建一个由范围的串联组成的数组,上面的输入应创建一维数组
1 2 3 4 3 5 7
即我们需要设计接下来要执行的功能:
print(np.multi_arange(np.array([[1,5,1],[3,8,2]])))
# prints:
# array([1,2,3,4,7])
此函数应该高效(纯numpy),即非常快的(10000,3)
形状的过程输入数组,而没有纯Python循环。
当然可以创建纯Python的循环(或listcomp)以为每一行创建一个范围,并连接该循环的结果。但是我有很多行具有三元组的开始/停止/步进,并且需要高效且快速的代码,因此需要寻找纯粹的numpy函数。
我为什么需要它。我需要几个任务。其中之一是用于索引的-假设我有1D数组a
,并且我需要提取此数组的许多(可能是相交的)子范围。如果我有该版本的arange的多版本,我会做:
values = a[np.multi_arange(starts_stops_steps)]
也许可以使用numpy函数的某些组合来创建多范围函数?你能建议吗?
在提取一维数组子范围的特定情况下,也许还有一些更有效的解决方案(请参见上面的代码最后一行),而无需使用multi_arange创建所有索引?
解决方法
这是带有cumsum
的矢量化图片,用于说明正步长和负步长-
def multi_arange(a):
steps = a[:,2]
lens = ((a[:,1]-a[:,0]) + steps-np.sign(steps))//steps
b = np.repeat(steps,lens)
ends = (lens-1)*steps + a[:,0]
b[0] = a[0,0]
b[lens[:-1].cumsum()] = a[1:,0] - ends[:-1]
return b.cumsum()
如果您需要验证有效范围:(start < stop when step > 0)
和(start > stop when step < 0)
,请使用预处理步骤:
a = a[((a[:,1] > a[:,0]) & (a[:,2]>0) | (a[:,1] < a[:,2]<0))]
样品运行-
In [17]: a
Out[17]:
array([[ 1,5,1],[ 3,8,2],[18,6,-2]])
In [18]: multi_arange(a)
Out[18]: array([ 1,2,3,4,7,18,16,14,12,10,8])
,
In [1]: np.r_[1:5:1,3:8:2]
Out[1]: array([1,7])
In [2]: np.hstack((np.arange(1,1),np.arange(3,2)))
Out[2]: array([1,7])
r_
版本既好又紧凑,但速度不快:
In [3]: timeit np.r_[1:5:1,3:8:2]
23.9 µs ± 34.6 ns per loop (mean ± std. dev. of 7 runs,10000 loops each)
In [4]: timeit np.hstack((np.arange(1,2)))
11.2 µs ± 19.5 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
,
我刚刚使用numba
提出了解决方案。如果我们发现最好的解决方案不要携带笨重的numba JIT编译器,我仍然更喜欢仅numpy
的解决方案。
我也在我的代码中测试了@Divakar解决方案。
下一个代码输出是:
naive_multi_arange 0.76601 sec
arty_multi_arange 0.01801 sec 42.52 speedup
divakar_multi_arange 0.05504 sec 13.92 speedup
意味着我的numba
解决方案的速度提高了42倍,@ Divakar的numpy
解决方案的速度提高了14倍。
下一个代码也可以是run online here。
import time,random
import numpy as np,numba
@numba.jit(nopython = True)
def arty_multi_arange(a):
starts,stops,steps = a[:,0],a[:,2]
pos = 0
cnt = np.sum((stops - starts + steps - np.sign(steps)) // steps,dtype = np.int64)
res = np.zeros((cnt,),dtype = np.int64)
for i in range(starts.size):
v,stop,step = starts[i],stops[i],steps[i]
if step > 0:
while v < stop:
res[pos] = v
pos += 1
v += step
elif step < 0:
while v > stop:
res[pos] = v
pos += 1
v += step
assert pos == cnt
return res
def divakar_multi_arange(a):
steps = a[:,0] - ends[:-1]
return b.cumsum()
random.seed(0)
neg_prob = 0.5
N = 100000
minv,maxv,maxstep = -100,300,15
steps = [random.randrange(1,maxstep + 1) * ((1,-1)[random.random() < neg_prob]) for i in range(N)]
starts = [random.randrange(minv + 1,maxv) for i in range(N)]
stops = [random.randrange(*(((starts[i] + 1,maxv + 1),(minv,starts[i]))[steps[i] < 0])) for i in range(N)]
joined = np.array([starts,steps],dtype = np.int64).T
tb = time.time()
aref = np.concatenate([np.arange(joined[i,joined[i,dtype = np.int64) for i in range(N)])
npt = time.time() - tb
print('naive_multi_arange',round(npt,5),'sec')
for func in ['arty_multi_arange','divakar_multi_arange']:
globals()[func](joined)
tb = time.time()
a = globals()[func](joined)
myt = time.time() - tb
print(func,round(myt,'sec',round(npt / myt,2),'speedup')
assert a.size == aref.size,(a.size,aref.size)
assert np.all(a == aref),np.vstack((np.flatnonzero(a != aref)[:5],a[a != aref][:5],aref[a != aref][:5])).T