问题描述
我有一些嵌套循环(总共3个),在这里我尝试使用numpy.einsum加快计算速度,但是我在努力使符号正确。我设法摆脱了一个循环,但我想不出另外两个循环。这是到目前为止我得到的:
import numpy as np
import time
def myfunc(r,q,f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for qi in range(nq):
y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
return y
r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
start = time.time()
y = myfunc(r,f)
end = time.time()
print(end-start)
虽然这比原始速度快得多,但是仍然太慢,大约需要30秒。请注意,没有einsum调用的原始文件如下(它似乎需要花费约2.5个小时,没有等待确定的时间):
def myfunc(r,f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi))
return y
有人知道如何用einsum或其他工具摆脱这些循环吗?
解决方法
您的功能似乎等同于以下内容:
# this is so called broadcasting
s = np.sinc(q * r[...,None]/np.pi)
np.einsum('iq,jq,ijq->q',f,s)
在我的系统上花费了大约20秒的时间,大部分时间用于分配s
。
让我们测试一下一个小样本:
np.random.seed(1)
r = np.random.random(size=(10,10))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
(np.abs(np.einsum('iq,s) - myfunc(r,q,f)) < 1e-6).all()
# True
由于np.sinc
不是线性运算符,所以我不确定如何进一步缩短运行时间。
sinc
是实际的瓶颈,@Quang Hoang's post中也提到过。我们将从那里开始使用einsum
表达式,以一种类似的方式结束-
现在,从docs
,numpy.sinc(x)
是:\sin(\pi x)/(\pi x)
。我们将利用它-
v = q*r[...,None]
p = np.sin(v)/v
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,p)
此外,对于大数据,我们可以使用numexpr
来利用多核,就像这样-
import numexpr as ne
p = ne.evaluate('sin(q*r3D)/(q*r3D)',{'r3D':r[...,None]})
mask = (q==0) | (r==0)[...,p)
具有500个长度数组的计时-
In [12]: r = np.random.random(size=(500,500))
...: q = np.linspace(0,501)
...: f = np.random.random(size=(r.shape[0],q.shape[0]))
# Original soln with einsum
In [15]: %%timeit
...: nr = r.shape[0]
...: nq = q.shape[0]
...: y = np.zeros(nq)
...: for ri in range(nr):
...: for qi in range(nq):
...: y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
9.75 s ± 977 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
# @Quang Hoang's soln
In [16]: %%timeit
...: s = np.sinc(q * r[...,None]/np.pi)
...: np.einsum('iq,s)
2.75 s ± 7.82 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
In [17]: %%timeit
...: p = ne.evaluate('sin(q3D*r)/(q3D*r)',{'q3D':q[:,None,None]})
...: mask = (q==0)[:,None] | (r==0)
...: p[mask] = 1
...: out = np.einsum('iq,qij->q',p)
1.39 s ± 23.5 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
In [18]: %%timeit
...: v = q*r[...,None]
...: p = np.sin(v)/v
...: mask = (q==0) | (r==0)[...,None]
...: p[mask] = 1
...: out = np.einsum('iq,p)
2.11 s ± 7.42 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
有了更大的数据,我们希望numexpr
的性能更好,只要不遇到内存不足的情况。
最简单的方法(可能也是性能最高的方法)是使用编译器,例如Numba。由于此功能取决于const test: BarCharState = {
labels: ['foo','bar'],datasets: [
{
label: 'foo',backgroundColor: '#fefefe',hoverBackgroundColor: '#ffffff',data: ['a','b'],barThickness: 3,},]
}
功能,因此请确保已安装Intel SVML。
示例
sinc
带有小数组的定时
import numpy as np
import numba as nb
@nb.njit(fastmath=True,parallel=False,error_model="numpy",cache=True)
def myfunc(r,f):
nr = r.shape[0]
nq = q.shape[0]
y = np.zeros(nq)
for ri in range(nr):
for rj in range(nr):
for qi in range(nq):
y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi)
return y
@nb.njit(fastmath=True,parallel=True,cache=True)
def myfunc_opt(r,f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
#for contiguous memory access in the loop
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
acc += f_T[qi,ri]*f_T[qi,rj]*np.sinc(q[qi]*r[ri,rj]/np.pi)
y[qi]=acc
return y
@nb.njit(fastmath=True,cache=True)
def myfunc_opt_2(r,f):
nr = r.shape[0]
nq = q.shape[0]
y = np.empty(nq)
f_T=np.ascontiguousarray(f.T)
for qi in nb.prange(nq):
acc=0
for ri in range(nr):
for rj in range(nr):
#Test carefully!
if q[qi]*r[ri,rj]!=0.:
acc += f_T[qi,rj]*np.sin(q[qi]*r[ri,rj])/(q[qi]*r[ri,rj])
else:
acc += f_T[qi,rj]
y[qi]=acc
return y
def numpy_func(r,f):
s = np.sinc(q * r[...,None]/np.pi)
return np.einsum('iq,s)
具有较大数组的定时
r = np.random.random(size=(500,500))
q = np.linspace(0,501)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r,f)
#765 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
%timeit y = myfunc_opt(r,f)
#158 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
%timeit y = myfunc_opt_2(r,f)
#51.5 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs,10 loops each)
%timeit y = numpy_func(r,f)
#3.81 s ± 61.9 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
print(np.allclose(numpy_func(r,f),myfunc(r,f)))
#True
print(np.allclose(numpy_func(r,myfunc_opt(r,myfunc_opt_2(r,f)))
,
我之所以写此答案,是因为我从@Quang Hoang's post中学到了很多关于einsum
的知识,如果我分享解决问题的想法,它将加深我的理解。
问题是要为{p> 1设计适当的操作
einsum
-
查看相关数组的形状。输入:
y[qi] += np.einsum('i,:]/np.pi))
,r --> (a,a)
和q --> (c,)
;输出:f --> (a,c)
。从这些形状中,对于y --> (c,)
部分,必须通过q[qi]*r[ri,:]
之类的东西来创建形状为(a,a,c)
的新数组。 -
由于
r[...,None]*q
不会改变数组的形状,并且对于固定对的np.sinc
,(ri,qi)
只是一个数字,我们应该首先考虑{{1 }}操作将重现f[ri,qi]
,即如何从形状einsum
和f[:,:]/np.pi)
的两个数组中获得形状(a,c)
的数组。直观地,它是(a,c)
。 -
对于一对固定的
(a,c)
,由于kl,ikl->il
给出了形状为(ri,qi)
的数组,并且f[:,:]/np.pi)
的形状为(a,c)
,因此最终操作只需f
基于以上分析,我们有解决方案:
(a,c)