问题描述
我正在寻找一种有效的方法来执行以下操作;这是一个最小的工作代码:
import numpy as np
from scipy.signal import fftconvolve
n = 7
m = 100
N = 3000
a = np.random.rand( n,m,N ) + np.random.rand( n,N )*1j
b = np.random.rand( n,N )*1j
# we want product over the n-dimension,with fftconvolve in the m-indices elementwise
old_shape= a.shape
new_shape= ( n*m,N )
a = a.reshape( new_shape )
for i in range( n ):
b_tiled = np.tile( b[ i,:,: ],( n,1,1 )).reshape( new_shape )
result = ( fftconvolve( b_tiled,a,mode="same",axes=-1 ) ).reshape( old_shape )
result = result.sum( axis=0 )
该操作在第一个索引中以类似乘积的方式计算两个数组的 FFT(因此我避免了对 range(n) 索引的双重循环,仅使用一个)。
解决方法
参考实现
我将开始将操作包装在一个函数中,以便我可以轻松地比较实现
def ref_impl(a,b):
n,m,N = a.shape
a = a.reshape( m*n,N )
ans = []
for i in range( n ):
b_tiled = np.tile( b[ i,:,: ],( n,1,1 )).reshape( n*m,N )
result = ( fftconvolve( b_tiled,a,mode="same",axes=-1 ) ).reshape( n,N )
ans.append(result.sum( axis=0 ))
return np.array(ans);
简化平铺数组的总和
Tile 复制一个元素,求和减少一个轴上的所有元素。
请注意,b_tiled
在第一个轴上是常量,您进行了一些整形,但所有内容都对齐了,因此第一个 result[k] = fftconvolve(b[i,:],a[k,:])
因此第二个结果可以计算为
result = sum(fftconvolve(b[i,:]) for k in range(n))
由于 fftconvolve 是线性的,所以可以写成
result = fftconvolve(b[i,sum(a[k,:]for k in range(n)))
这种形式可以矢量化
def impl1(a,N = a.shape
ans = []
for i in range( n ):
result = ( fftconvolve( b[i,a.sum(axis=0),axes=-1 ) )
ans.append(result)
return np.array(ans);
简化切片堆栈
Index 在一个轴上取一个元素,stack 会构造一个包含多个元素的数组,这个操作可以用单个向量化操作代替
def impl2(a,N = a.shape
return ( fftconvolve( b,np.tile(a.sum(axis=0),(n,1)),axes=-1 ) )
复杂性
提议的实现将使用更少的暂存空间,将摆脱 for 循环,并且只会执行一个 fftconvolve
而不是 n
。总之,对于大型输入,它的运行速度会快 n
倍。