高效平均移动平均 累计金额

问题描述

我有一个给定(恒定)频率的数据(整数)流。有时我需要计算不同的平均值(预定义)。我正在寻找快速高效的解决方案。

假设:

  • 采样率是恒定的(预定义),可能在 125-500 SPS 之间
  • 我需要计算的平均值是预定义的,它可能是一个或多个平均值(例如,仅过去 200 毫秒的平均值或过去 250 毫秒和过去 500 毫秒的平均值)。可能有很多平均值,但它们是预定义的!
  • 我需要能够随时计算当前平均值(实时)

我现在拥有的:

  • 我假设在特定的时间范围内总是会有相同数量的数据。所以频率为 100SPS 我假设一秒正好包含 100 个值
  • 创建长度恒定的队列(类似于缓冲区)
  • 对于每个定义的平均值,都会创建 Sum 变量
  • 每次新样品到达时,我都会将其放入队列中。
  • 每次队列中有新样本时,我都会将其值添加到我拥有的每个 Sum 变量中,并删除窗口外元素的值(基于队列中的位置)
  • 一旦我需要计算平均值,我只需将特定的 Sum 变量除以该 Sum 应包含的元素数

为了让您更深入地了解,我现在有一个代码

public class Buffer<T> : LinkedList<T>
{
    private readonly int capacity;

    public bool IsFull => Count >= capacity;

    public Buffer(int capacity)
    {
        this.capacity = capacity;
    }

    public void Enqueue(T item)
    {
        if (Count == capacity)
        {
            RemoveFirst();
        }
        AddLast(item);
    }
}


public class MovingAverage
{
    private readonly Buffer<float> Buffer;
    private static readonly object bufferLock = new object();
    public Dictionary<string,float> Sums { get; private set; }
    public Dictionary<string,int> Counts { get; private set; }

    public MovingAverage(List<int> sampleCounts,List<string> names)
    {
        if (sampleCounts.Count != names.Count)
        {
            throw new ArgumentException("Wrong Moving Averages parameters");
        }
        Buffer = new Buffer<float>(sampleCounts.Max());

        Sums = new Dictionary<string,float>();
        Counts = new Dictionary<string,int>();

        for (int i = 0; i < names.Count; i++)
        {
            Sums[names[i]] = 0;
            Counts[names[i]] = sampleCounts[i];
        }
    }


    public void ProcessAveraging(float val)
    {
        lock (bufferLock)
        {
            if (float.IsNaN(val))
            {
                val = 0;
            }
            foreach (var keyval in Counts.OrderBy(a => a.Value))
            {
                Sums[keyval.Key] += val;
                if (Buffer.Count >= keyval.Value)
                {
                    Sums[keyval.Key] -= Buffer.ElementAt(Buffer.Count - keyval.Value);
                }

            }
            Buffer.Enqueue(val);
        }
    }

    public float GetLastAverage(string averageName)
    {
        lock (bufferLock)
        {
            if (Buffer.Count >= Counts[averageName])
            {
                return Sums[averageName] / Counts[averageName];
            }
            else
            {
                return Sums[averageName] / Buffer.Count;
            }
        }
    }
}

这真的很好用,速度也足够快,但在现实世界中,100 SPS 并不意味着你总是在 1 秒内有 100 个样本。有时是 100,有时是 99,有时是 101。计算这些平均值对我的系统至关重要,1 个样本或多或少可能会发生很大变化。这就是为什么我需要一个实时计时器来告诉我样本是否已经超出移动平均窗口。

为每个样本添加时间戳的想法似乎很有希望

解决方法

我不使用链表,而是使用一些内部函数作为数组副本。在这个答案中,我为您的缓冲区类包含了一个可能的重写。接管了在每个位置保持总和的想法。

这个缓冲区跟踪所有的总和,但为了做到这一点,它需要用新值对每个项目求和。根据您需要获得该平均值的频率,最好在您需要时进行总结并仅保留单个值。

无论如何,我只是想指出如何使用 Array.Copy 来做到这一点

public class BufferSum
{
    private readonly int _capacity;
    private readonly int _last;
    private float[] _items;

    public int Count { get; private set; }

    public bool IsFull => Count >= _capacity;

    public BufferSum(int capacity)
    {
        _capacity = capacity;
        _last = capacity - 1;
        _items = new float[_capacity];
    }

    public void Enqueue(float item)
    {
        if (Count == _capacity)
        {
            Array.Copy(_items,1,_items,_last);
            _items[_last] = 0;
        }
        else
        {
            Count++;
        }

        for (var i = 0; i < Count; i ++)
        {
            _items[i] += item;
        }
    }

    public float Avarage => _items[0] / Count;

    public float AverageAt(int ms,int fps)
    {
        var _pos = Convert.ToInt32(ms / 1000 * fps);
        return _items[Count - _pos] / _pos; 
    }
}

另外要注意 lock 语句,这将花费很多时间。

,

这里有很多答案..不妨再添加一个:)

这个可能需要一些小的调试来“一对一”等 - 我没有一个真正的数据集可以使用,所以也许把它当作伪代码

就像你的一样:有一个圆形的缓冲区 - 给它足够的容量来保存 N 个样本,其中 N 足以检查你的移动平均线 - 100 SPS 并且想要检查 250ms 我认为你至少需要 25,但是我们不缺空间,所以你可以把它做得更多

struct Cirray
{
    long _head;
    TimedFloat[] _data;

    public Cirray(int capacity)
    {
        _head = 0;
        _data = new TimedFloat[capacity];
    }

    public void Add(float f)
    {
        _data[_head++%_data.Length] = new TimedFloat() { F = f };
    }

    public IEnumerable<float> GetAverages(int[] forDeltas)
    {
        double sum = 0;
        long start = _head - 1;
        long now = _data[start].T;
        int whichDelta = 0;

        for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
        {
            if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
            {
                yield return (float)(sum / (start - idx));
                whichDelta++;
            }

            sum += _data[idx % _data.Length].F;
        }
    }
}

struct TimedFloat
{
    [DllImport("Kernel32.dll",CallingConvention = CallingConvention.Winapi)]
    private static extern void GetSystemTimePreciseAsFileTime(out long filetime);


    private float _f;
    public float F { get => _f;
        set {
            _f = value;
            GetSystemTimePreciseAsFileTime(out long x);
            T = DateTime.FromFileTimeUtc(x).Ticks;
        }
    }
    public long T;

}

正常的 DateTime.UtcNow 不是很精确 - 大约 16 毫秒 - 因此,如果您说即使是一个样本也可能将其丢弃,那么对这样的数据进行时间戳处理可能没有好处。相反,如果您的系统支持它(如果不支持,您可能需要更改系统,或滥用 StopWatch 类来提供更高分辨率的补充),我们可以使用它来获得与高分辨率计时器等效的滴答声,并且我们正在添加时间戳每个数据项。

我考虑过维护 N 个不断移动的指针到数据的各个尾端的复杂性,并减少/增加 N 个总和 - 它仍然可以完成(并且您清楚地知道如何完成),但是您的问题已阅读就像您可能很少要求平均值一样,N sums/counts 解决方案将花费更多的时间来维护计数,而不是时不时地运行 250 或 500 个浮点数然后将它们相加。因此,GetAverages 需要一组 ticks(每毫秒 10,000 次)的数据范围,例如new[] { 50 * 10000,100 * 10000,150 * 10000,200 * 10000,250 * 10000 } 以 50 为步长持续 50 毫秒到 250 毫秒,它从当前头部开始并向后求和,直到它将打破时间边界(这可能是一个位),然后它产生该时间跨度的平均值,然后在下一个时间跨度继续求和和计数(开始的数学减去当前索引给出的计数)..我想我理解正确,你想要例如“过去 50 毫秒的平均值”和“过去 100 毫秒的平均值”,而不是“最近 50 毫秒的平均值”和“最近之前 50 毫秒的平均值”

编辑:

想了想,就这样做了:

结构体 { 长_头; TimedFloat[] _data; 运行平均值[] _ravgs;

    public Cirray(int capacity)
    {
        _head = 0;
        _data = new TimedFloat[capacity];
    }

    public Cirray(int capacity,int[] deltas) : this(capacity)
    {
        _ravgs = new RunningAverage[deltas.Length];
        for (int i = 0; i < deltas.Length; i++)
            _ravgs[i] = new RunningAverage() { OverMilliseconds = deltas[i] };
    }

    public void Add(float f)
    {
        //in c# every assignment returns the assigned value; capture it for use later
        var addedTF = (_data[_head++ % _data.Length] = new TimedFloat() { F = f });

        if (_ravgs == null)
            return;

        foreach (var ra in _ravgs)
        {
            //add the new tf to each RA
            ra.Count++;
            ra.Total += addedTF.F;

            //move the end pointer in the RA circularly up the array,subtracting/uncounting as we go
            var boundary = addedTF.T - ra.OverMilliseconds; 
            while (_data[ra.EndPointer].T < boundary) //while the sample is timed before the boundary,move the
            {
                ra.Count--; 
                ra.Total -= _data[ra.EndPointer].F;
                ra.EndPointer = (ra.EndPointer + 1) % _data.Length; //circular indexing
            }
        }

    }

    public IEnumerable<float> GetAverages(int[] forDeltas)
    {
        double sum = 0;
        long start = _head - 1;
        long now = _data[start].T;
        int whichDelta = 0;

        for (long idx = start; idx >= 0 && whichDelta < forDeltas.Length; idx--)
        {
            if (_data[idx % _data.Length].T < now - forDeltas[whichDelta])
            {
                yield return (float)(sum / (start - idx));
                whichDelta++;
            }

            sum += _data[idx % _data.Length].F;
        }
    }

    public IEnumerable<float> GetAverages() //from the built ins
    {
        foreach (var ra in _ravgs)
        {
            if (ra.Count == 0)
                yield return 0;
            else
                yield return (float)(ra.Total / ra.Count);
        }
    }
}

绝对没有测试过,但在评论中体现了我的想法

,

创建一个大小为 500 的数组,int counter c

For every sample:
    summ -= A[c % 500]  //remove old value
    summ += sample 
    A[c % 500] = sample  //replace it with new value
    c++
    if needed,calculate
        average = summ / 500
,

您总是希望删除序列一侧最旧的元素并在序列的另一侧添加新元素:您需要一个队列而不是堆栈。

我认为圆形列表会更快:只要您没有最大尺寸,只需添加元素,一旦达到最大尺寸,替换最旧的元素。

这似乎是一个不错的可重用类。稍后我们将添加移动平均部分。

class RoundArray<T>
{
    public RoundArray(int maxSize)
    {
        this.MaxSize = maxSize;
        this.roundArray = new List<T>(maxSize);
    }

    private readonly int maxSize;
    private readonly List<T> roundArray;
    public int indexOldestItem = 0;

    public void Add(T item)
    {
        // if list not full,just add
        if (this.roundArray.Count < this.maxSize)
            this.roundArray.Add(item);
        else
        {
            // list is full,replace the oldest item:
            this.roundArray[oldestItem] = item;
            oldestItem = (oldestItem + 1) % this.maxSize;
        } 

        public int Count => this.roundArray.Count;
        public T Oldest => this.roundArray[this.indexOldestItem];               
    }
}

为了使这个类有用,添加方法来枚举数据,从最旧的或最新的开始,考虑添加其他有用的可重用方法。也许你应该实现IReadOnlyCollection<T>。也许一些私有字段应该有公共属性。

您的移动平均计算器将使用此 RoundArray。每当添加一个项目,并且您的 roundArray 尚未满时,该项目就会添加到 sum 和 round 数组中。

如果 roundArray 已满,则该项目替换最旧的项目。您从 Sum 中减去 OldestItem 的值,然后将新 Item 添加到 Sum。

class MovingAverageCalculator
{
    public MovingAverageCalculator(int maxSize)
    {
        this.roundArray = new RoundArray<int>(maxSize);
    }

    private readonly RoundArray<int> roundArray;
    private int sum = 0;

    private int Count => this.RoundArray.Count;
    private int Average => this.sum / this.Count;

    public voidAdd(int value)
    {
        if (this.Count == this.MaxSize)
        {
            // replace: remove the oldest value from the sum and add the new one
            this.Sum += value - this.RoundArray.Oldest;
        }
        else
        {
            // still building: just add the new value to the Sum
            this.Sum  += value;
        }
        this.RoundArray.Add(value);
    }
}
,

累计金额。

为每个大约 1000 个元素的块计算一系列累积和1。 (可能会更少,但是 500 或 1000 没有太大区别,这会更舒服)只要里面至少有一个元素是相关的,您就希望保留每个块。然后就可以回收了。2

当您需要当前金额并且您在一个区块内时,您需要的金额是:
block[max_index] - block[last_relevant_number]

对于当您按此顺序位于两个块 b1,b2 的边界线的情况下,您想要的总和为:
b1[b1.length - 1] - b1[last_relevant_number] + b2[max_index]

我们已经完成了。这种方法的主要优点是您无需事先知道要保留多少个元素,并且可以随时随地计算结果。
您也不需要处理元素的删除,因为您在回收段时自然会覆盖它们 - 保留索引就是您所需要的。

示例:让我们有一个常数时间序列 ts = [1,.... 1]。系列的累积总和将为 cumsum = [1,2,3 ... n]。从 ts 的第 i 个元素到第 j 个(包含)元素的总和将是 cumsum[j] - cumsum[i - 1] = j - i - 1。对于 i = 5,j = 6 它将是 6 - 4 = 2 这是正确的。


1 对于数组 [1,3,4,5] 这些将是 [1,6,10,15] - 只是为了完整性。
2 既然你提到了 ~500 个元素,那么两个块应该就足够了。