问题描述
我目前正在尝试优化一个旧的且编写得很差的类,该类处理大量数据,因此很容易花费多个小时来处理一组数据。收集数据已经花费了很多时间,这就是我在此处尝试进行的改进。我知道这是很臭的代码,但这只是一个测试,即使它可以改善任何地方,所以请专注于该问题:
我尝试使用SemaphoreSlim
和Semaphore
来减少同时运行的任务数量。我的数据集将生成大约70个任务,这可能会导致线程不足和整体性能下降。至少事实证明,它变得反应迟钝了。因此,我尝试将其同时保留在5个任务上,以提高整体吞吐量。
现在,当我尝试等待任务进入它阻止的sempahore(使用await的苗条针刺也阻止)时,即使信号量未满,它也永远不会进入。这段代码位于异步方法中,作为轻微的上下文提示。
Semaphore throttle = new Semaphore(0,5);
try
{
foreach (var folder in folders)
{
// Wait in case there are already 5 tasks running to reduce thread starvation
collectionTasks.Add(Task.Run( () =>
{
// ReSharper disable once AccesstodisposedClosure
throttle.WaitOne();
return GetGapProfiles(folder.Value,progress,token);
},token).ContinueWith(
t =>
{
// ReSharper disable once AccesstodisposedClosure
throttle.Release();
return t.Result;
},TaskContinuationoptions.None));
}
// When all are loaded concat all results into one collection
await Task.WhenAll(collectionTasks);
}
catch (Exception ex)
{
Log.Error(ex,"Failed to collect profiles.");
}
finally
{
throttle.dispose();
}
我只是不明白为什么它会阻塞并且永远不会进入GetGapProfiles
。谁能解释一下?
解决方法
如果在Task.WhenAll
和Semaphore
的情况下需要控制并行度,则可以执行以下操作:
public Task DownloadThemAllAsync(byte maxLevelOfParallism,params Task[] tasks)
{
var throttler = new SemaphoreSlim(maxLevelOfParallism);
return Task.WhenAll(
tasks.Select(async task =>
{
await throttler.WaitAsync();
try { await task; }
finally { throttler.Release(); }
}
));
}
- 我们开始了所有任务
- 只有
maxLevelOfParallism
个作业可以通过throttler.WaitAsync
行
2.1)其余工作将必须以非阻塞方式等待 - 如果一项任务完成,则它将通过调用
Release
进入“新位置”
- 其余任务之一将通过
throttler.WaitAsync
行 - 这一直持续到所有任务完成为止
public static class perTaskThrottle
{
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static Task<IDictionary<TInput,TResult>> ForEachAsyncThrottled<TInput,TResult>(
this IEnumerable<TInput> sourceItems,Func<TInput,Task<TResult>> func,int concurrentTasks = 1)
{
return ForEachAsyncThrottled(sourceItems,func,CancellationToken.None,concurrentTasks);
}
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="token"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static async Task<IDictionary<TInput,CancellationToken token,int concurrentTasks = 1)
{
var result = new ConcurrentDictionary<TInput,TResult>();
var tasksList = new List<Task>();
using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
{
foreach (var item in sourceItems)
{
token.ThrowIfCancellationRequested();
// if there are already concurrentTasks tasks executing,pause until one has completed ( semaphoreSlim.Release() )
await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever,token).ConfigureAwait(false);
token.ThrowIfCancellationRequested();
Action<Task<TResult>> okContinuation = async task =>
{
// the task has already completed if status is CompletedOk,but using await once more is safer than using task.Result
var taskResult = await task;
result[item] = taskResult;
};
// ReSharper disable once AccessToDisposedClosure
Action<Task> allContinuation = task => semaphoreSlim.Release();
tasksList.Add(func.Invoke(item)
.ContinueWith(okContinuation,TaskContinuationOptions.OnlyOnRanToCompletion)
.ContinueWith(allContinuation,token));
token.ThrowIfCancellationRequested();
}
if (!token.IsCancellationRequested)
{
await Task.WhenAll(tasksList).ConfigureAwait(false);
}
}
return result;
}
}
因此,您可以使用
var results = folders.ForEachAsyncThrottled( (f) => GetGapProfiles(f.Value),token,5);