NumPy中有多范围吗?

问题描述

Numpy的范围仅接受用于开始/停止/步进的单个标量值。此功能是否有多种版本?哪个可以接受数组输入作为开始/停止/步进?例如。输入二维数组,如:

[[1 5 1],# start/stop/step first
 [3 8 2]] # start/stop/step second

应为输入的每一行(每个开始/停止/步骤)创建一个由范围的串联组成的数组,上面的输入应创建一维数组

1 2 3 4 3 5 7

即我们需要设计接下来要执行的功能

print(np.multi_arange(np.array([[1,5,1],[3,8,2]])))
# prints:
# array([1,2,3,4,7])

函数应该高效(纯numpy),即非常快的(10000,3)形状的过程输入数组,而没有纯Python循环。

当然可以创建纯Python的循环(或listcomp)以为每一行创建一个范围,并连接该循环的结果。但是我有很多行具有三元组的开始/停止/步进,并且需要高效且快速代码,因此需要寻找纯粹的numpy函数

我为什么需要它。我需要几个任务。其中之一是用于索引的-假设我有1D数组a,并且我需要提取此数组的许多(可能是相交的)子范围。如果我有该版本的arange的多版本,我会做:

values = a[np.multi_arange(starts_stops_steps)]

也许可以使用numpy函数的某些组合来创建多范围函数?你能建议吗?

提取一维数组子范围的特定情况下,也许还有一些更有效的解决方案(请参见上面的代码最后一行),而无需使用multi_arange创建所有索引?

解决方法

这是带有cumsum的矢量化图片,用于说明正步长和负步长-

def multi_arange(a):
    steps = a[:,2]
    lens = ((a[:,1]-a[:,0]) + steps-np.sign(steps))//steps
    b = np.repeat(steps,lens)
    ends = (lens-1)*steps + a[:,0]
    b[0] = a[0,0]
    b[lens[:-1].cumsum()] = a[1:,0] - ends[:-1]
    return b.cumsum()

如果您需要验证有效范围:(start < stop when step > 0)(start > stop when step < 0),请使用预处理步骤:

a = a[((a[:,1] > a[:,0]) & (a[:,2]>0) | (a[:,1] < a[:,2]<0))]

样品运行-

In [17]: a
Out[17]: 
array([[ 1,5,1],[ 3,8,2],[18,6,-2]])

In [18]: multi_arange(a)
Out[18]: array([ 1,2,3,4,7,18,16,14,12,10,8])
,
In [1]: np.r_[1:5:1,3:8:2]
Out[1]: array([1,7])

In [2]: np.hstack((np.arange(1,1),np.arange(3,2)))
Out[2]: array([1,7])

r_版本既好又紧凑,但速度不快:

In [3]: timeit np.r_[1:5:1,3:8:2]
23.9 µs ± 34.6 ns per loop (mean ± std. dev. of 7 runs,10000 loops each)
In [4]: timeit np.hstack((np.arange(1,2)))
11.2 µs ± 19.5 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
,

我刚刚使用numba提出了解决方案。如果我们发现最好的解决方案不要携带笨重的numba JIT编译器,我仍然更喜欢仅numpy的解决方案。

我也在我的代码中测试了@Divakar解决方案。

下一个代码输出是:

naive_multi_arange 0.76601 sec
arty_multi_arange 0.01801 sec 42.52 speedup
divakar_multi_arange 0.05504 sec 13.92 speedup

意味着我的numba解决方案的速度提高了42倍,@ Divakar的numpy解决方案的速度提高了14倍。

下一个代码也可以是run online here

import time,random
import numpy as np,numba

@numba.jit(nopython = True)
def arty_multi_arange(a):
    starts,stops,steps = a[:,0],a[:,2]
    pos = 0
    cnt = np.sum((stops - starts + steps - np.sign(steps)) // steps,dtype = np.int64)
    res = np.zeros((cnt,),dtype = np.int64)
    for i in range(starts.size):
        v,stop,step = starts[i],stops[i],steps[i]
        if step > 0:
            while v < stop:
                res[pos] = v
                pos += 1
                v += step
        elif step < 0:
            while v > stop:
                res[pos] = v
                pos += 1
                v += step
    assert pos == cnt
    return res
    
def divakar_multi_arange(a):
    steps = a[:,0] - ends[:-1]
    return b.cumsum()
    
random.seed(0)
neg_prob = 0.5
N = 100000
minv,maxv,maxstep = -100,300,15
steps = [random.randrange(1,maxstep + 1) * ((1,-1)[random.random() < neg_prob]) for i in range(N)]
starts = [random.randrange(minv + 1,maxv) for i in range(N)]
stops = [random.randrange(*(((starts[i] + 1,maxv + 1),(minv,starts[i]))[steps[i] < 0])) for i in range(N)]
joined = np.array([starts,steps],dtype = np.int64).T

tb = time.time()
aref = np.concatenate([np.arange(joined[i,joined[i,dtype = np.int64) for i in range(N)])
npt = time.time() - tb
print('naive_multi_arange',round(npt,5),'sec')

for func in ['arty_multi_arange','divakar_multi_arange']:
    globals()[func](joined)
    tb = time.time()
    a = globals()[func](joined)
    myt = time.time() - tb
    print(func,round(myt,'sec',round(npt / myt,2),'speedup')
    assert a.size == aref.size,(a.size,aref.size)
    assert np.all(a == aref),np.vstack((np.flatnonzero(a != aref)[:5],a[a != aref][:5],aref[a != aref][:5])).T