排队任务以使用Task.ContinueWith处理H.264帧会导致内存问题

问题描述

我的应用程序使用RTSPClientSharp库从相机流式传输,当解码帧准备就绪时,会引发OnFramesReceived事件。我在同一事件中将解码的帧转换为位图,这是一个阻塞调用,耗时超过100毫秒,导致帧速率降低到10 FPS。

为解决此问题,我使用了here中的任务队列代码,该任务使用Task.ContinueWith.UnWrap将ProcessFrame事件(具有将解码的帧转换为Bitmap的代码)排队。我的目标是按照我收到帧的顺序依次执行ProcessFrame调用。使用任务队列解决了阻止呼叫的问题,现在我每秒能够处理30帧。

但是,我现在遇到内存问题,如果我的应用程序运行时间更长,则内存使用率会逐渐增加。 ANTS内存分析器说(Check ScreenShot)说ContinuationResultFrom Task是Gen2中最大的类。

更新 我想介绍的一些事实是,我有10个此类摄像机连接到我的应用程序,每个摄像机都有自己的摄像机类实例。我仍然使用具有超线程和32GB RAM的16核处理器,但是,如果CPU无法处理负载,我希望将FPS降低到10。

  private void OnFramesReceived(object sender,RawFrame rawFrame)
    {
         taskQueue.Enqueue(() => Task.Run(() => ProcessFrame?.Invoke(this,decodedFrame)));           
    }

  private void HandleProcessFrame(object sender,IDecodedVideoFrame decodedFrame)
    {
        try
        { 
            using (Bitmap bmpBitmap = new Bitmap(m_Width,m_Height))
            {
                BitmapData bmpData = bmpBitmap.LockBits(new Rectangle(0,bmpBitmap.Width,bmpBitmap.Height),ImageLockMode.WriteOnly,bmpBitmap.PixelFormat);

                try
                {
                    decodedFrame.TransformTo(
                        bmpData.Scan0,bmpData.Stride,_transformParameters);
                }
                finally
                {
                    bmpBitmap.UnlockBits(bmpData);
                } 
                base.OnNewFrameEvent(this,bmpBitmap);
                decodedFrame = null;
            
            }


        }
        catch (Exception ex)
        {
            Logng.LogError(ex);
        }
    }
 public class TaskQueue
{
    private Task previous = Task.FromResult(false);
    private object key = new object();

    public Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator()).Unwrap();
            previous = next;
            return next;
        }
    }
    public Task Enqueue(Func<Task> taskGenerator)
    {
        lock (key)
        {
            var next = previous.ContinueWith(t => taskGenerator(),TaskContinuationOptions.ExecuteSynchronously).Unwrap();
            previous = next;
            return next;
        }
    }
}

解决方法

通过使用延续,您将创建一个不受中央控制的队列,并且该队列的存储效率也不高。除了实际有效负载(RawFrame)之外,您还要为每个延续支付200-300字节的开销。我建议改用更有条理,更高效的工具,例如TPL Dataflow库。

下面是使用TPL Dataflow库的示例。单个ActionBlock是该库中最简单的组件,它为计算提供了强大的功能。您可以通过设置BoundedCapacity选项来配置其内部队列的大小。当队列已满时,即将到来的消息将被丢弃(Post方法将返回false)。您也可以配置MaxDegreeOfParallelism。您可以利用机器上所有可用的内核/处理器,也可以让一两个内核自由地做其他工作。

private readonly ActionBlock<RawFrame> _actionBlock;

public MyClass() // constructor
{
    _actionBlock = new ActionBlock<RawFrame>(rawFrame =>
    {
        ProcessFrame(rawFrame);
    },new ExecutionDataflowBlockOptions()
    {
        BoundedCapacity = 10,// the default is unbounded
        MaxDegreeOfParallelism = Environment.ProcessorCount,// the default is 1
    });
}

private void OnFramesReceived(object sender,RawFrame rawFrame)
{
    _actionBlock.Post(rawFrame);
}

TPL数据流库内置于.NET Core中,并且可以作为package用于.NET Framework。

相关问答

依赖报错 idea导入项目后依赖报错,解决方案:https://blog....
错误1:代码生成器依赖和mybatis依赖冲突 启动项目时报错如下...
错误1:gradle项目控制台输出为乱码 # 解决方案:https://bl...
错误还原:在查询的过程中,传入的workType为0时,该条件不起...
报错如下,gcc版本太低 ^ server.c:5346:31: 错误:‘struct...