问题描述
假设有一个形状为(N,M)的numpy数组m,我想计算
res = np.zeros((M,M))
for i in range(N):
res += np.outer(m[i],m[i])
使用einsum
(即
res = np.sum(np.einsum('ij,ik->ijk',m,axis=0)
但这需要存储一个N x M x M的矩阵,这可能非常(在我的情况下是)。
我想使用并列化在cython中构建此功能
import numpy as np
cimport numpy as np
from cython.parallel import prange
def get_s(double[:,:] m):
cdef Py_ssize_t i = 0
cdef int n = m.shape[1]
res = 0.
for i in prange(n,nogil=True):
res += np.outer(m[i],m[i])
return res
这段代码的想法是
这段代码的运行会产生很多错误,因为我使用的是python对象,不允许使用操作,而且我不知道如何正确初始化res
。
解决方法
nogil上下文不能使用numpy函数(np.outer
)。因此,您只需使用循环将其拼出即可。
此外,您的res
变量似乎是一个数组,因此您需要声明一个变量并将其初始化。
最后,您希望将循环编译为C,从而使用类型化的内存视图。使用numpy数组进行内存管理并获取它们的内存视图是最简单的。一起考虑,
%%cython -a
cimport cython
import numpy as np
@cython.boundscheck(False)
@cython.wraparound(False)
def m_outer(double[:,::1] a):
n,m = a.shape[0],a.shape[1]
cdef double[:,::1] resm = np.zeros((m,m))
for i in range(a.shape[0]):
for j in range(a.shape[1]):
for k in range(a.shape[1]):
resm[j,k] += a[i,j] * a[i,k]
return np.asarray(resm)
一种写这些东西的方法(也许是 的方法)是用python写(不要考虑速度),在一个小例子中验证输出(我使用3-by-4),然后进行cythonize。
进行cythonize时,请使用%cython -a
并检查生成的C代码。
现在,这里有两个明显的机会:重新排列循环以提升循环常数并使用prange。两者都留给读者练习。
最后一点。除非是出于教育目的,否则请注意,您真正计算的是矩阵乘积A.T @ A
。
您的迭代:
In [139]: res = np.zeros((4,4))
In [140]: for i in range(3): res += np.outer(m[i],m[i])
In [141]: res
Out[141]:
array([[ 80.,92.,104.,116.],[ 92.,107.,122.,137.],[104.,140.,158.],[116.,137.,158.,179.]])
我们可以在广播中执行相同的outer
:
In [142]: np.sum(m[:,:,None]*m[:,None,:],axis=0)
Out[142]:
array([[ 80,92,104,116],[ 92,107,122,137],[104,140,158],[116,137,158,179]])
(是的,这确实构成了一个临时(N,M,M)数组)
建议的单步求和:
In [143]: np.einsum('ij,ik->jk',m,m)
Out[143]:
array([[ 80,179]])
这只是一个简单的点积(具有适当的转置):
In [144]: m.T.dot(m)
Out[144]:
array([[ 80,179]])
In [145]: m.T@m
Out[145]:
array([[ 80,179]])
由于numpy
点使用了快速的BLAS代码,因此我怀疑您是否可以使用cython
对其进行改进。