问题描述
我正在尝试理解 C# 中的任务,但仍有一些问题。我正在尝试创建一个包含视频的应用程序。主要目的是从文件中读取视频(我正在使用 Emgu.CV)并通过 TCP/IP 将其发送到板中进行处理,然后以流(实时)方式返回。首先,我是连续做的。因此,读取 Bitmap
,从板上发送-接收,并绘图。但是读取位图并绘制它们需要太多时间。我想要一个传输、接收 FIFO 缓冲区来保存视频帧,以及一个不同的任务来完成发送接收每一帧的工作。所以我想同时进行。我想我应该创建 3 个任务:
tasks.Add(Task.Run(() => Video_load(video_path)));
tasks.Add(Task.Run(() => Video_Send_Recv(video_path)));
tasks.Add(Task.Run(() => Videodisp_hw(32)));
我想“并行”运行。我应该使用什么类型的对象?并发队列?缓冲块?还是只是一个列表?
多谢指教!我想问一件事。我正在尝试使用 2 个 TPL 块创建一个简单的控制台程序。 1 Block 将是 Transform 块(接收消息,即 "start" )并将数据加载到 List ,另一个块将是 ActionBlock (仅从列表中读取数据并打印它们)。代码如下:
namespace TPL_Dataflow
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
Random randn = new Random();
var loadData = new TransformBlock<string,List<int>>(async sample_string =>
{
List<int> input_data = new List<int>();
int cnt = 0;
if (sample_string == "start")
{
Console.WriteLine("Inside loadData");
while (cnt < 16)
{
input_data.Add(randn.Next(1,255));
await Task.Delay(1500);
Console.WriteLine("Cnt");
cnt++;
}
}
else
{
Console.WriteLine("Not started yet");
}
return input_data;
});
var PrintData = new ActionBlock<List<int>>(async input_data =>
{
while(input_data.Count > 0)
{
Console.WriteLine("output Data = " + input_data.First());
await Task.Delay(1000);
input_data.RemoveAt(0);
}
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
loadData.LinkTo(PrintData,input_data => input_data.Count() >0 );
//loadData.LinkTo(PrintData,linkOptions);
loadData.SendAsync("start");
loadData.Complete();
PrintData.Completion.Wait();
}
}
}
但它似乎以串行方式工作..我做错了什么?我尝试异步执行 while 循环。我想同时做两件事。当列表中的数据可用时,然后绘制。
解决方法
您可以使用 TransformManyBlock<string,int>
作为生产者块,使用 ActionBlock<int>
作为消费者块。 TransformManyBlock
将使用接受 Func<string,IEnumerable<int>>
委托的构造函数进行实例化,并传递一个 iterator method(下面示例中的 Produce
方法),该方法一一生成值:
Random random = new Random();
var producer = new TransformManyBlock<string,int>(Produce);
IEnumerable<int> Produce(string message)
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1,255);
Console.WriteLine($"Producing #{value}");
yield return value;
Thread.Sleep(1500);
cnt++;
}
}
else
{
yield break;
}
}
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
producer.LinkTo(consumer,new() { PropagateCompletion = true });
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
不幸的是,生产者必须在产生每个值 (Thread.Sleep(1500);
) 之间的空闲期间阻塞工作线程,因为 TransformManyBlock
当前没有接受 Func<string,IAsyncEnumerable<int>>
的构造函数。这可能会在 TPL Dataflow 库的下一个版本中修复。您可以跟踪 this GitHub 问题,以了解此功能何时发布。
替代解决方案:您可以不将生产者和消费者明确链接起来,而是将它们保持不链接,并手动将生产者产生的值发送给消费者。在这种情况下,两个块都是 ActionBlock
s:
Random random = new Random();
var consumer = new ActionBlock<int>(async value =>
{
Console.WriteLine($"Received: {value}");
await Task.Delay(1000);
});
var producer = new ActionBlock<string>(async message =>
{
if (message == "start")
{
int cnt = 0;
while (cnt < 16)
{
int value;
lock (random) value = random.Next(1,255);
Console.WriteLine($"Producing #{value}");
var accepted = await consumer.SendAsync(value);
if (!accepted) break; // The consumer has failed
await Task.Delay(1500);
cnt++;
}
}
});
PropagateCompletion(producer,consumer);
producer.Post("start");
producer.Complete();
consumer.Completion.Wait();
async void PropagateCompletion(IDataflowBlock source,IDataflowBlock target)
{
try { await source.Completion.ConfigureAwait(false); } catch { }
var ex = source.Completion.IsFaulted ? source.Completion.Exception : null;
if (ex != null) target.Fault(ex); else target.Complete();
}
这种方法的主要困难在于如何将生产者的完成传播给消费者,从而最终两个块都完成。显然您不能使用 new DataflowLinkOptions { PropagateCompletion = true }
配置,因为这些块没有明确链接。您也不能手动 Complete
消费者,因为在这种情况下,它会过早地停止接受来自生产者的值。此问题的解决方案是上面示例中显示的 PropagateCompletion
方法。