减少python程序计算的算法时间

问题描述

我在python上使用算法计算过滤器(low_pass,high_pass ...),在我的程序中,我正在从实时设备读取数据,我需要进行处理,然后将其传输回设备。 每个数据块每10毫秒出现一次,因此,此时我必须计算尽可能多的过滤器。 我的算法

def do_calculation(self,indata,outdata):
  for i in range(0,len(indata)):
    val=self.a0*indata[i]
    val+=self.a1*self.xn1 
    val+=self.a2*self.xn2 
    val-=self.b1*self.yn1 
    val-=self.b2*self.yn2
    outdata[i]= val
    self.xn2 = self.xn1
    self.xn1 = indata[i]
    self.yn2 = self.yn1
    self.yn1 = outdata[i]

我在开始读/写之前计算的系数(a0,a1,a2,b1,b2)

我使用从设备获得的每个输入从Main函数调用此函数,对其进行处理,然后使用outdata将其写回到设备。 indata是这样的大小为512的列表 列表[[x0,x1] [x1,x2] ..... [x510,x511]]

有什么方法可以改善此功能的性能?也许python是关于它的限制。 目前,每个过滤器占用的时间大约为 2或3 (有些过滤器甚至需要5毫秒),并且我想减少它,以便可以在10毫秒的范围内创建更多过滤器。>

感谢帮助!

解决方法

下面的第三个版本do_calculation3是您的版本的两倍

class C():
    def __init__(self):
        self.a0 = .1
        self.a1 = .02
        self.a2 = -.3
        self.b1 = .2
        self.b2 = -.25
        self.xn1 = -.1
        self.xn2 = .11
        self.yn1 = .12
        self.yn2 = -.001
        self.ab = [self.a0,self.a1,self.a2,-self.b1,-self.b2]
        self.xy = [self.xn1,self.xn2,self.yn1,self.yn2]

    def do_calculation(self,indata,outdata):
      for i in range(0,len(indata)):
        val=self.a0*indata[i]
        val+=self.a1*self.xn1 
        val+=self.a2*self.xn2 
        val-=self.b1*self.yn1 
        val-=self.b2*self.yn2
        outdata[i]= val
        self.xn2 = self.xn1
        self.xn1 = indata[i]
        self.yn2 = self.yn1
        self.yn1 = outdata[i]

    def do_calculation2(self,outdata):
        xy = self.xy
        ab = self.ab
        for i,ini in enumerate(indata):
            val = sum((n * m for n,m in zip(ab,[ini] + xy)))
            outdata[i] = val
            xy = [ini,xy[0],val,xy[2]]
        self.xy = xy

    def do_calculation3(self,outdata):
        j,k,l,m,n = self.ab
        xn1,xn2,yn1,yn2 = self.xn1,self.yn2
        for i,ini in enumerate(indata):
            outdata[i] = val = j*ini + k*xn1 + l*xn2 + m*yn1 + n*yn2
            xn1,yn2 = ini,xn1,yn1
        self.xn1,self.yn2 = xn1,yn2


c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
c.do_calculation(dati,dato)

c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
c2.do_calculation2(dati2,dato2)

c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
c3.do_calculation3(dati3,dato3)

assert dato == dato2
assert dato == dato3

#%%
print('\n## do_calculation\n')
c = C()
dati = list((1/x if x else 0) for x in range(512))
dato = [0 for _ in dati]
%timeit c.do_calculation(dati,dato)

print('\n## do_calculation2\n')
c2 = C()
dati2 = list((1/x if x else 0) for x in range(512))
dato2 = [0 for _ in dati2]
%timeit c2.do_calculation2(dati2,dato2)

print('\n## do_calculation3\n')
c3 = C()
dati3 = list((1/x if x else 0) for x in range(512))
dato3 = [0 for _ in dati3]
%timeit c3.do_calculation3(dati3,dato3)

时间

## do_calculation

887 µs ± 35.9 µs per loop (mean ± std. dev. of 7 runs,1000 loops each)

## do_calculation2

1.49 ms ± 417 µs per loop (mean ± std. dev. of 7 runs,1000 loops each)

## do_calculation3

332 µs ± 83.5 µs per loop (mean ± std. dev. of 7 runs,1000 loops each)

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...