用numpy.einsum删除循环

问题描述

我有一些嵌套循环(总共3个),在这里我尝试使用numpy.einsum加快计算速度,但是我在努力使符号正确。我设法摆脱了一个循环,但我想不出另外两个循环。这是到目前为止我得到的:

import numpy as np
import time

def myfunc(r,q,f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for qi in range(nq):
            y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
    return y

r = np.random.random(size=(1000,1000))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))

start = time.time()
y = myfunc(r,f)
end = time.time()

print(end-start)

虽然这比原始速度快得多,但是仍然太慢,大约需要30秒。请注意,没有einsum调用的原始文件如下(它似乎需要花费约2.5个小时,没有等待确定的时间):

def myfunc(r,f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for rj in range(nr):
            for qi in range(nq):
                y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi))
    return y

有人知道如何用einsum或其他工具摆脱这些循环吗?

解决方法

您的功能似乎等同于以下内容:

# this is so called broadcasting
s = np.sinc(q * r[...,None]/np.pi)

np.einsum('iq,jq,ijq->q',f,s)

在我的系统上花费了大约20秒的时间,大部分时间用于分配s

让我们测试一下一个小样本:

np.random.seed(1)
r = np.random.random(size=(10,10))
q = np.linspace(0,1,1001)
f = np.random.random(size=(r.shape[0],q.shape[0]))
(np.abs(np.einsum('iq,s) - myfunc(r,q,f)) < 1e-6).all()
# True

由于np.sinc不是线性运算符,所以我不确定如何进一步缩短运行时间。

,

sinc是实际的瓶颈,@Quang Hoang's post中也提到过。我们将从那里开始使用einsum表达式,以一种类似的方式结束-

现在,从docsnumpy.sinc(x)是:\sin(\pi x)/(\pi x)。我们将利用它-

v = q*r[...,None]
p = np.sin(v)/v
mask = (q==0) | (r==0)[...,None]
p[mask] = 1
out = np.einsum('iq,p)

此外,对于大数据,我们可以使用numexpr来利用多核,就像这样-

import numexpr as ne

p = ne.evaluate('sin(q*r3D)/(q*r3D)',{'r3D':r[...,None]})
mask = (q==0) | (r==0)[...,p)

具有500个长度数组的计时-

In [12]: r = np.random.random(size=(500,500))
    ...: q = np.linspace(0,501)
    ...: f = np.random.random(size=(r.shape[0],q.shape[0]))

# Original soln with einsum
In [15]: %%timeit
    ...: nr = r.shape[0]
    ...: nq = q.shape[0]
    ...: y = np.zeros(nq)
    ...: for ri in range(nr):
    ...:     for qi in range(nq):
    ...:         y[qi] += np.einsum('i,i',f[ri,qi]*f[:,qi],np.sinc(q[qi]*r[ri,:]/np.pi))
9.75 s ± 977 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

# @Quang Hoang's soln
In [16]: %%timeit
    ...: s = np.sinc(q * r[...,None]/np.pi)
    ...: np.einsum('iq,s)
2.75 s ± 7.82 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

In [17]: %%timeit
    ...: p = ne.evaluate('sin(q3D*r)/(q3D*r)',{'q3D':q[:,None,None]})
    ...: mask = (q==0)[:,None] | (r==0)
    ...: p[mask] = 1
    ...: out = np.einsum('iq,qij->q',p)
1.39 s ± 23.5 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

In [18]: %%timeit
    ...: v = q*r[...,None]
    ...: p = np.sin(v)/v
    ...: mask = (q==0) | (r==0)[...,None]
    ...: p[mask] = 1
    ...: out = np.einsum('iq,p)
2.11 s ± 7.42 ms per loop (mean ± std. dev. of 7 runs,1 loop each)

有了更大的数据,我们希望numexpr的性能更好,只要不遇到内存不足的情况。

,

最简单的方法(可能也是性能最高的方法)是使用编译器,例如Numba。由于此功能取决于const test: BarCharState = { labels: ['foo','bar'],datasets: [ { label: 'foo',backgroundColor: '#fefefe',hoverBackgroundColor: '#ffffff',data: ['a','b'],barThickness: 3,},] } 功能,因此请确保已安装Intel SVML

示例

sinc

带有小数组的定时

import numpy as np
import numba as nb

@nb.njit(fastmath=True,parallel=False,error_model="numpy",cache=True)
def myfunc(r,f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.zeros(nq)
    for ri in range(nr):
        for rj in range(nr):
            for qi in range(nq):
                y[qi] += f[ri,qi]*f[rj,qi]*np.sinc(q[qi]*r[ri,rj]/np.pi)
    return y

@nb.njit(fastmath=True,parallel=True,cache=True)
def myfunc_opt(r,f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.empty(nq)

    #for contiguous memory access in the loop
    f_T=np.ascontiguousarray(f.T)
    for qi in nb.prange(nq):
        acc=0
        for ri in range(nr):
            for rj in range(nr):
                acc += f_T[qi,ri]*f_T[qi,rj]*np.sinc(q[qi]*r[ri,rj]/np.pi)
        y[qi]=acc
    return y

@nb.njit(fastmath=True,cache=True)
def myfunc_opt_2(r,f):
    nr = r.shape[0]
    nq = q.shape[0]
    y = np.empty(nq)


    f_T=np.ascontiguousarray(f.T)
    for qi in nb.prange(nq):
        acc=0
        for ri in range(nr):
            for rj in range(nr):
                #Test carefully!
                if q[qi]*r[ri,rj]!=0.:
                    acc += f_T[qi,rj]*np.sin(q[qi]*r[ri,rj])/(q[qi]*r[ri,rj])
                else:
                    acc += f_T[qi,rj]
        y[qi]=acc
    return y

def numpy_func(r,f):
    s = np.sinc(q * r[...,None]/np.pi)
    return np.einsum('iq,s)

具有较大数组的定时

r = np.random.random(size=(500,500))
q = np.linspace(0,501)
f = np.random.random(size=(r.shape[0],q.shape[0]))
%timeit y = myfunc(r,f)
#765 ms ± 1.85 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
%timeit y = myfunc_opt(r,f)
#158 ms ± 2.59 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
%timeit y = myfunc_opt_2(r,f)
#51.5 ms ± 1.17 ms per loop (mean ± std. dev. of 7 runs,10 loops each)
%timeit y = numpy_func(r,f)
#3.81 s ± 61.9 ms per loop (mean ± std. dev. of 7 runs,1 loop each)
print(np.allclose(numpy_func(r,f),myfunc(r,f)))
#True
print(np.allclose(numpy_func(r,myfunc_opt(r,myfunc_opt_2(r,f)))
,

我之所以写此答案,是因为我从@Quang Hoang's post中学到了很多关于einsum的知识,如果我分享解决问题的想法,它将加深我的理解。

问题是要为{p> 1设计适当的操作

einsum

  1. 查看相关数组的形状。输入:y[qi] += np.einsum('i,:]/np.pi))r --> (a,a)q --> (c,);输出:f --> (a,c)。从这些形状中,对于y --> (c,)部分,必须通过q[qi]*r[ri,:]之类的东西来创建形状为(a,a,c)的新数组。

  2. 由于r[...,None]*q不会改变数组的形状,并且对于固定对的np.sinc(ri,qi)只是一个数字,我们应该首先考虑{{1 }}操作将重现f[ri,qi],即如何从形状einsumf[:,:]/np.pi)的两个数组中获得形状(a,c)的数组。直观地,它是(a,c)

  3. 对于一对固定的(a,c),由于kl,ikl->il给出了形状为(ri,qi)的数组,并且f[:,:]/np.pi)的形状为(a,c),因此最终操作只需f

基于以上分析,我们有解决方案:

(a,c)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...