异步任务,视频缓冲

问题描述

我正在尝试理解 C# 中的任务,但仍有一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我正在使用 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回。首先,我是连续做的。因此,读取 Bitmap,从板上发送-接收,并绘图。但是读取位图并绘制它们需要太多时间。我想要一个传输、接收 FIFO 缓冲区来保存视频帧,以及一个不同的任务来完成发送接收每一帧的工作。所以我想同时进行。我想我应该创建 3 个任务:

        tasks.Add(Task.Run(() => Video_load(video_path)));
        tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
        tasks.Add(Task.Run(() => Videodisp_hw(32)));

我想“并行”运行。我应该使用什么类型的对象?并发队列?缓冲块?还是只是一个列表?

多谢指教!我想问一件事。我正在尝试使用 2 个 TPL 块创建一个简单的控制台程序。 1 Block 将是 Transform 块(接收消息,即 "start" )并将数据加载到 List ,另一个块将是 ActionBlock (仅从列表中读取数据并打印它们)。代码如下:

namespace TPL_Dataflow
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            Random randn = new Random();

            var loadData = new TransformBlock<string,List<int>>(async sample_string =>
           {
               List<int> input_data = new List<int>();
               int cnt = 0;

                if (sample_string == "start")
                {
                   Console.WriteLine("Inside loadData");
                   while (cnt < 16)
                   {
                       input_data.Add(randn.Next(1,255));
                       await Task.Delay(1500);
                       Console.WriteLine("Cnt");
                       cnt++;
                   }
                                    }
                else
                {
                    Console.WriteLine("Not started yet");

                }
            return input_data;
           });


            var PrintData = new ActionBlock<List<int>>(async input_data =>
            {
                while(input_data.Count > 0)
                {


                    Console.WriteLine("output Data = " + input_data.First());
                    await Task.Delay(1000);
                    input_data.RemoveAt(0);
                    
                }
 

              });

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            loadData.LinkTo(PrintData,input_data => input_data.Count() >0  );
            //loadData.LinkTo(PrintData,linkOptions);
            
            loadData.SendAsync("start");
            loadData.Complete();
            PrintData.Completion.Wait();

        }
    }
}

但它似乎以串行方式工作..我做错了什么?我尝试异步执行 while 循环。我想同时做两件事。当列表中的数据可用时,然后绘制。

解决方法

您可以使用 TransformManyBlock<string,int> 作为生产者块,使用 ActionBlock<int> 作为消费者块。 TransformManyBlock 将使用接受 Func<string,IEnumerable<int>> 委托的构造函数进行实例化,并传递一个 iterator method(下面示例中的 Produce 方法),该方法一一生成值:

Random random = new Random();

var producer = new TransformManyBlock<string,int>(Produce);

IEnumerable<int> Produce(string message)
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1,255);
            Console.WriteLine($"Producing #{value}");
            yield return value;
            Thread.Sleep(1500);
            cnt++;
        }
    }
    else
    {
        yield break;
    }
}

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

producer.LinkTo(consumer,new() { PropagateCompletion = true });

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

不幸的是,生产者必须在产生每个值 (Thread.Sleep(1500);) 之间的空闲期间阻塞工作线程,因为 TransformManyBlock 当前没有接受 Func<string,IAsyncEnumerable<int>> 的构造函数。这可能会在 TPL Dataflow 库的下一个版本中修复。您可以跟踪 this GitHub 问题,以了解此功能何时发布。


替代解决方案:您可以不将生产者和消费者明确链接起来,而是将它们保持不链接,并手动将生产者产生的值发送给消费者。在这种情况下,两个块都是 ActionBlocks:

Random random = new Random();

var consumer = new ActionBlock<int>(async value =>
{
    Console.WriteLine($"Received: {value}");
    await Task.Delay(1000);
});

var producer = new ActionBlock<string>(async message =>
{
    if (message == "start")
    {
        int cnt = 0;
        while (cnt < 16)
        {
            int value;
            lock (random) value = random.Next(1,255);
            Console.WriteLine($"Producing #{value}");
            var accepted = await consumer.SendAsync(value);
            if (!accepted) break; // The consumer has failed
            await Task.Delay(1500);
            cnt++;
        }
    }
});

PropagateCompletion(producer,consumer);

producer.Post("start");
producer.Complete();
consumer.Completion.Wait();

async void PropagateCompletion(IDataflowBlock source,IDataflowBlock target)
{
    try { await source.Completion.ConfigureAwait(false); } catch { }
    var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
    if (ex != null) target.Fault(ex); else target.Complete();
}

这种方法的主要困难在于如何将生产者的完成传播给消费者,从而最终两个块都完成。显然您不能使用 new DataflowLinkOptions { PropagateCompletion = true } 配置,因为这些块没有明确链接。您也不能手动 Complete 消费者,因为在这种情况下,它会过早地停止接受来自生产者的值。此问题的解决方案是上面示例中显示的 PropagateCompletion 方法。