问题描述
当前我的代码是
self.df['sma'] = self.df['Close'].rolling(window=30).mean()
self.df['cma'] = self.df.apply(lambda x: self.get_cma(x),axis=1)
def get_cma(self,candle):
if np.isnan(candle['sma']):
return np.nan
secma = (candle['sma'] - self.prevIoUs_cma if self.prevIoUs_cma is not None else 0) ** 2
ka = 1 - (candle['var']/secma) if candle['var'] < secma else 0
cma = ((ka * candle['sma']) + ((1 - ka) * self.prevIoUs_cma)) if self.prevIoUs_cma is not None else candle[self.src]
self.prevIoUs_cma = cma
return cma
以上内容可以优化以使其更快吗?
解决方法
您可能已经知道,Pandas表演的秘诀就是以矢量形式进行。这意味着没有apply
。通过将get_cma()
函数的某些部分提取为其向量等效项,可以采取以下几步来加快代码速度。
if np.isnan(candle['sma']):
return np.nan
get_cma()
中不需要提前退出,我们可以这样做:
self.df['cma'] = np.nan
valid = self.df['sma'].notnull()
# this comment is a placeholder for step 2
self.df.loc[valid,'cma'] = self.df[valid].apply(self.get_cma,axis=1)
这不仅矢量化了get_cma()
的前两行,这意味着get_cma()
现在仅在非空行而不是每一行上被调用。仅取决于您的数据,即可提供明显的加速效果。
如果那还不够,我们需要更大的锤子。根本的问题是get_cma()
的每个迭代都取决于前一个迭代,因此矢量化并不容易。因此,让我们使用Numba来JIT编译代码。首先,我们需要通过在各个列上使用旧的apply
循环来摆脱for
,这是等效的(并且仍然很慢)。请注意,这是一个免费的(全局)函数,而不是成员函数,并且它使用NumPy数组而不是Pandas类型,因为这些是Numba理解的:
def get_cma(sma,var,src):
cma = np.empty_like(sma)
# take care of the initial value first,to avoid unnecessary branches later
cma[0] = src[0]
# now do all remaining rows,cma[ii-1] is previous_cma and is never None
for ii in range(1,len(sma)):
secma = (sma[ii] - cma[ii-1]) ** 2
ka = 1 - (var[ii] / secma) if var[ii] < secma else 0
cma[ii] = (ka * sma[ii]]) + ((1 - ka) * cma[ii-1])
return cma
这样调用,将所需的列作为NumPy数组传递:
valid_rows = self.df[valid]
self.df.loc[valid,'cma'] = get_cma(
valid_rows['sma'].to_numpy(),valid_rows['var'].to_numpy(),valid_rows[self.src].to_numpy())
最后,在确认代码可以正常工作之后,装饰get_cma()
以便使用Numba进行自动编译,如下所示:
import numba
@numba.njit
def get_cma(sma,src):
...
就是这样。请让我们知道这在您的真实数据上运行的速度有多快。我希望它足够快。