信号量块虽然不满

问题描述

我目前正在尝试优化一个旧的且编写得很差的类,该类处理大量数据,因此很容易花费多个小时来处理一组数据。收集数据已经花费了很多时间,这就是我在此处尝试进行的改进。我知道这是很臭的代码,但这只是一个测试,即使它可以改善任何地方,所以请专注于该问题:

我尝试使用SemaphoreSlimSemaphore来减少同时运行的任务数量。我的数据集将生成大约70个任务,这可能会导致线程不足和整体性能下降。至少事实证明,它变得反应迟钝了。因此,我尝试将其同时保留在5个任务上,以提高整体吞吐量。

现在,当我尝试等待任务进入它阻止的sempahore(使用await的苗条针刺也阻止)时,即使信号量未满,它也永远不会进入。这段代码位于异步方法中,作为轻微的上下文提示

Semaphore throttle = new Semaphore(0,5);

try
{
    foreach (var folder in folders)
    {
        // Wait in case there are already 5 tasks running to reduce thread starvation
        collectionTasks.Add(Task.Run( () =>
        {
            // ReSharper disable once AccesstodisposedClosure
            throttle.WaitOne();
            return GetGapProfiles(folder.Value,progress,token);
        },token).ContinueWith(
            t =>
            {
                // ReSharper disable once AccesstodisposedClosure
                throttle.Release();
                return t.Result;
            },TaskContinuationoptions.None));
    }

    // When all are loaded concat all results into one collection
    await Task.WhenAll(collectionTasks);
}
catch (Exception ex)
{
    Log.Error(ex,"Failed to collect profiles.");
}
finally
{
    throttle.dispose();
}

我只是不明白为什么它会阻塞并且永远不会进入GetGapProfiles。谁能解释一下?

解决方法

如果在Task.WhenAllSemaphore的情况下需要控制并行度,则可以执行以下操作:

public Task DownloadThemAllAsync(byte maxLevelOfParallism,params Task[] tasks)
{
    var throttler = new SemaphoreSlim(maxLevelOfParallism);

    return Task.WhenAll(
        tasks.Select(async task =>
        {
                await throttler.WaitAsync(); 
                try { await task; }
                finally { throttler.Release(); }
            }
        ));
}
  1. 我们开始了所有任务
  2. 只有maxLevelOfParallism个作业可以通过throttler.WaitAsync
    2.1)其余工作将必须以非阻塞方式等待
  3. 如果一项任务完成,则它将通过调用Release
  4. 进入“新位置”
  5. 其余任务之一将通过throttler.WaitAsync
  6. 这一直持续到所有任务完成为止
,
public static class perTaskThrottle
{
    /// <summary>
    /// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
    /// </summary>
    /// <typeparam name="TInput"></typeparam>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="sourceItems"></param>
    /// <param name="func"></param>
    /// <param name="concurrentTasks"></param>
    /// <returns></returns>
    public static Task<IDictionary<TInput,TResult>> ForEachAsyncThrottled<TInput,TResult>(
        this IEnumerable<TInput> sourceItems,Func<TInput,Task<TResult>> func,int concurrentTasks = 1)
    {
        return ForEachAsyncThrottled(sourceItems,func,CancellationToken.None,concurrentTasks);
    }

    /// <summary>
    /// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
    /// </summary>
    /// <typeparam name="TInput"></typeparam>
    /// <typeparam name="TResult"></typeparam>
    /// <param name="sourceItems"></param>
    /// <param name="func"></param>
    /// <param name="token"></param>
    /// <param name="concurrentTasks"></param>
    /// <returns></returns>
    public static async Task<IDictionary<TInput,CancellationToken token,int concurrentTasks = 1)
    {
        var result = new ConcurrentDictionary<TInput,TResult>();

        var tasksList = new List<Task>();
        using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
        {
            foreach (var item in sourceItems)
            {
                token.ThrowIfCancellationRequested();

                // if there are already concurrentTasks tasks executing,pause until one has completed ( semaphoreSlim.Release() )
                await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever,token).ConfigureAwait(false);

                token.ThrowIfCancellationRequested();

                Action<Task<TResult>> okContinuation = async task =>
                {
                    // the task has already completed if status is CompletedOk,but using await once more is safer than using task.Result
                    var taskResult = await task;
                    result[item] = taskResult;
                };

                // ReSharper disable once AccessToDisposedClosure
                Action<Task> allContinuation = task => semaphoreSlim.Release();

                tasksList.Add(func.Invoke(item)
                    .ContinueWith(okContinuation,TaskContinuationOptions.OnlyOnRanToCompletion)
                    .ContinueWith(allContinuation,token));

                token.ThrowIfCancellationRequested();
            }

            if (!token.IsCancellationRequested)
            {
                await Task.WhenAll(tasksList).ConfigureAwait(false);
            }
        }

        return result;
    }
}

因此,您可以使用

var results = folders.ForEachAsyncThrottled( (f) => GetGapProfiles(f.Value),token,5);