问题描述
我目前有一个函数可以并行执行一组 10 个任务。在 10 个任务完成后,我继续进行下一个 10 个任务,直到我的队列为空。如果我的 9 个任务在 1 分钟内完成,而我的第 10 个任务又需要 10 分钟,我期待提高该算法的效率,即使我有 9 个点,我也需要等待所有 10 个任务完成免费开始使用其他 9 个任务。
有没有办法在任务完成后立即发送另一个任务以在同一级别内进行处理(对于每个循环)。我看到可以使用并发字典。能否请您指导并提供一些示例代码。
public async Task test()
{
List<task> listoftasks =new List<Task>();
foreach(level in levels)
{
Queue<Model1> queue=new Queue<Model1>(Store);
while(queue.Count>0)
{
for(int i=0;i<10;i++)
{
if(!queue.TryDequeue(out Model1 item))
{
break;
}
listoftasks.Add(Task.Run(()=>Dosomething(sql)))
}
await Task.WhenAll(listoftasks);
listoftasks .Clear();
}
}
}
解决方法
您可以仅使用 LimitedConcurrencyLevelTaskScheduler
来实现所需的行为 (https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0)。在这种情况下,您可以一次性推送所有任务,它们将以所需的并发级别执行(在您的情况下,并行任务不超过 10 个)。
您可以让每个任务出列一个项目。使用 ConcurrentQueue
确保线程安全。
这有点像穷人的调度程序,但它非常轻量级。
ConcurrentQueue<Model1> queue;
void Dequeue()
{
while(queue.TryDequeue(out var item))
DoSomething(item);
}
public async Task Test()
{
queue = new ConcurrentQueue<Model1>(Store);
var listoftasks = new List<Task>();
for (var i = 0; i < Environment.ProcessorCount; i++)
listoftasks.Add(Task.Run(() => Dequeue()));
await Task.WhenAll(listoftasks);
}
注意:这不处理异常,所以必须处理或吞下所有异常
,我个人会使用 ActionBlock
(来自 TPL 数据流库)。它有
- 内置于
MaxDegreeOfParallelism
- 可以轻松处理
async
IO Bound 工作负载,或非async
CPU Bound 工作负载 - 有取消支持(如果需要)
- 可以构建到更大的管道中
- 可以在多生产者环境中作为永久消费者运行
给定
private ActionBlock<Model> _processor;
设置
_processor = new ActionBlock<Model>(
DoSomething,new ExecutionDataflowBlockOptions()
{
CancellationToken = SomeCancelationTokenIfNeeded,MaxDegreeOfParallelism = 10,SingleProducerConstrained = true
});
一些方法
public static void DoSomething(Model item)
{ ... }
使用
await _processor.SendAsync(someItem);