问题描述
我有一个程序,它的主要性能瓶颈涉及将一个维度为 1 的矩阵和另一个大维度的矩阵相乘,例如1000:
large_dimension = 1000
a = np.random.random((1,))
b = np.random.random((1,large_dimension))
c = np.matmul(a,b)
换句话说,将矩阵 b
与标量 a[0]
相乘。
我正在寻找最有效的方法来计算这个,因为这个操作被重复了数百万次。
%timeit np.matmul(a,b)
>> 1.55 µs ± 45.8 ns per loop (mean ± std. dev. of 7 runs,1000000 loops each)
%timeit a[0] * b
>> 1.77 µs ± 34.6 ns per loop (mean ± std. dev. of 7 runs,1000000 loops each)
有没有更有效的方法来计算这个?
- 注意:我无法将这些计算移至 GPU,因为该程序使用多处理,并且许多此类计算是并行完成的。
解决方法
在这种情况下,使用逐元素乘法可能会更快,但您看到的时间主要是Numpy 的开销(从 CPython 解释器调用 C 函数,包装/解包类型、进行检查、执行操作、数组分配等)。
因为这个操作被重复了数百万次
这就是问题所在。事实上,CPython解释器在处理低延迟的事情方面非常糟糕。当您在 Numpy 类型上工作时尤其如此,因为调用 C 代码并执行对琐碎操作的检查比在纯 Python 中执行要慢得多,而纯 Python 也比编译的本机 C/C++ 代码慢得多。如果您真的需要它,并且您不能使用 Numpy 向量化您的代码(因为您有一个迭代时间步长的循环),那么您就不再使用 CPython,或者至少不再使用纯 Python 代码。相反,您可以使用 Numba 或 Cython 来减轻执行 C 调用、包装类型等的影响。如果这还不够,那么您将需要编写原生 C /C++ 代码(或任何类似的语言),除非您找到一个专用的 Python 包 正好为您做这件事。请注意,Numba 仅在适用于本机类型或 Numpy 数组(包含本机类型)时才快速。如果您使用大量纯 Python 类型并且不想重写代码,那么您可以尝试 PyPy JIT。
这是 Numba 中的一个简单示例,它避免了(代价高昂的)创建/分配新数组(以及许多 Numpy 内部检查和调用),专门为解决您的特定情况而编写:
@nb.njit('void(float64[::1],float64[:,::1],::1])')
def fastMul(a,b,out):
val = a[0]
for i in range(b.shape[1]):
out[0,i] = b[0,i] * val
res = np.empty(b.shape,dtype=b.dtype)
%timeit fastMul(a,res)
# 397 ns ± 0.587 ns per loop (mean ± std. dev. of 7 runs,1000000 loops each)
在撰写本文时,此解决方案比所有其他解决方案都快。由于大部分时间都花在调用 Numba 和执行一些内部检查上,因此直接将 Numba 用于包含迭代循环的函数应该会产生更快的代码。
,import numpy as np
import numba
def matmult_numpy(matrix,c):
return np.matmul(c,matrix)
@numba.jit(nopython=True)
def matmult_numba(matrix,c):
return c*matrix
if __name__ == "__main__":
large_dimension = 1000
a = np.random.random((1,large_dimension))
c = np.random.random((1,))
使用 Numba 大约有 3 倍的加速。 Numba cognoscenti 可以通过将参数“c”显式转换为标量来做得更好
检查:
的结果%timeit matmult_numpy(a,c)
2.32 µs 每个循环 ± 50 ns(7 次运行的平均值 ± 标准偏差,每次 100000 次循环)
%timeit matmult_numba(a,c)
763 ns ± 6.67 ns 每个循环(平均值 ± 标准偏差。7 次运行,每次 1000000 次循环)
large_dimension = 1000
a = np.random.random((1,))
B = np.random.random((1,large_dimension))
%timeit np.matmul(a,B)
5.43 µs ± 22 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
%timeit a[0] * B
5.11 µs ± 6.92 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
只使用浮动
%timeit float(a[0]) * B
3.48 µs ± 26.1 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
为了避免内存分配使用“缓冲区”
buffer = np.empty_like(B)
%timeit np.multiply(float(a[0]),B,buffer)
2.96 µs ± 37.1 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
为了避免不必要的获取属性,请使用“别名”
mul = np.multiply
%timeit mul(float(a[0]),buffer)
2.73 µs ± 12.6 ns per loop (mean ± std. dev. of 7 runs,100000 loops each)
而且我根本不建议使用 numpy 标量, 因为如果你避免它,计算会更快
a_float = float(a[0])
%timeit mul(a_float,buffer)
1.94 µs ± 5.74 ns per loop (mean ± std. dev. of 7 runs,1000000 loops each)
此外,如果可能的话,那么在循环外初始化缓冲区一次(当然,如果你有类似循环的东西:)
rng = range(1000)
%%timeit
for i in rng:
pass
24.4 µs ± 1.21 µs per loop (mean ± std. dev. of 7 runs,10000 loops each)
%%timeit
for i in rng:
mul(a_float,buffer)
1.91 ms ± 2.21 µs per loop (mean ± std. dev. of 7 runs,1000 loops each)
所以,
"best_iteration_time" = (1.91 - 0.02) / 1000 => 1.89 (µs)
“加速” = 5.43 / 1.89 = 2.87