我在 Parallel.ForEach 循环中收到 TaskCanceledException,如何解决?

问题描述

我在 C# 中运行 Parallel.ForEach。我低于TaskCanceledException: A task was canceled。任何人都可以建议我在哪里可以解决我的任务取消问题,或增加我的连接超时时间?

我的代码

private static void Main()
{
    var sqlServerName = "XXXXXXXXXX.database.windows.net";
    var sqlServerAdmin = "XXXXXXXXXX";
    var sqlServerAdminPasword = "XXXXXXXXXX";
    var DatabaseName = "XXXXXXXXXX";

    string sqlStatement = ($"DELETE FROM ResourceStatus WHERE ResourceType = 'RBAC' " +
        $"TruncATE TABLE RBACStaging ");
    Helper.ExecuteTsql(sqlServerName,sqlServerAdmin,sqlServerAdminPasword,DatabaseName,sqlStatement);

    sqlConnectionStringBuilder builder = new sqlConnectionStringBuilder();

    builder.DataSource = sqlServerName;
    builder.UserID = sqlServerAdmin;
    builder.Password = sqlServerAdminPasword;
    builder.InitialCatalog = DatabaseName;
    List<string> Subscriptions = new List<string>();

    using (sqlConnection connection = new sqlConnection(builder.ConnectionString))
    {
        connection.open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT SubscriptionID from Subscriptions " +
            $"where SOX = 'SR0' and Environment = 'Prod'");
        string sql = sb.ToString();

        using (sqlCommand command = new sqlCommand(sql,connection))
        {
            using (sqlDataReader Reader = command.ExecuteReader())
            {
                while (Reader.Read())
                {
                    Subscriptions.Add(Reader.GetString(0).ToString());
                }
                connection.Close();
                sqlConnection.ClearPool(connection);
            }
        }
    }
    Parallel.ForEach(Subscriptions,s =>
    {
        string SubscriptionID = s.Replace(" ",String.Empty);
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
    });
}

GetRBACSnapshot 方法

public static async Task GetRBACSnapshot(string SubscriptionID)
{
    var ClientID = "de9f7784-93e4-42d0-a68d-7f1457ce4e56";
    var AppKey = "XXXXXXXX";
    var sqlServerName = "XXXXXXXX.database.windows.net";
    var sqlServerAdmin = "XXXXXXXX";
    var sqlServerAdminPasword = "XXXXXXXX";
    var DatabaseName = "XXXXXXXX";
    string TenantID = null;
    string ServiceGroupName = null;
    string TeamGroupName = null;
    string ServiceName = null;
    string ServiceTreeID = null;
    string Level = null;
    string SOX = null;
    string SubscriptionName = null;
    string EnvironmentScope = null;
    string Tenant = null;

    DateTime RawDate = DateTime.Now;
    string RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    string sqlStatement = null;
    sqlStatement = ($"INSERT INTO ResourceStatus " +
                    $"SELECT 'RBAC','{SubscriptionID}','Started','{RefreshedAt}'");
    Helper.ExecuteTsql(sqlServerName,sqlStatement);

    sqlConnectionStringBuilder builder = new sqlConnectionStringBuilder();

    builder.DataSource = sqlServerName;
    builder.UserID = sqlServerAdmin;
    builder.Password = sqlServerAdminPasword;
    builder.InitialCatalog = DatabaseName;

    using (sqlConnection connection = new sqlConnection(builder.ConnectionString))
    {
        connection.open();
        StringBuilder sb = new StringBuilder();
        sb.Append($"SELECT ServiceGroupName,TeamGroupName,ServiceName," +
            $"ServiceTreeID,Level,SOX,SubscriptionName,Environment,Tenant " +
            $"from Subscriptions where SubscriptionID = '{SubscriptionID}'");
        string sql = sb.ToString();

        using (sqlCommand command = new sqlCommand(sql,connection))
        {
            using (sqlDataReader Reader = command.ExecuteReader())
            {
                while (Reader.Read())
                {
                    ServiceGroupName = Reader.GetString(0).ToString();
                    TeamGroupName = Reader.GetString(1).ToString();
                    ServiceName = Reader.GetString(2).ToString();
                    ServiceTreeID = Reader.GetString(3).ToString();
                    Level = Reader.GetString(4).ToString();
                    SOX = Reader.GetString(5).ToString();
                    SubscriptionName = Reader.GetString(6).ToString();
                    EnvironmentScope = Reader.GetString(7).ToString();
                    Tenant = Reader.GetString(8).ToString();
                }
            }
        }
        connection.Close();
        sqlConnection.ClearPool(connection);
    }

    // AARE Logic
    if (Tenant.Equals("AME",StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "33e01921-4d64-4f8c-a055-5bdaffd5e33d";
    }
    else if (Tenant.Equals("GME",StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "124edf19-b350-4797-aefc-3206115ffdb3";
    }
    else if (Tenant.Equals("PME",StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "975f013f-7f24-47e8-a7d3-abc4752bf346";
    }
    else if (Tenant.Equals("Corp",StringComparison.OrdinalIgnoreCase))
    {
        TenantID = "72f988bf-86f1-41af-91ab-2d7cd011db47";
    }
    string AzToken = await Helper.GetAccesstokenAsync(TenantID,ClientID,AppKey);
    string MSGraphToken = await Helper.GetGraphAccesstoken(TenantID,AppKey)
        .ConfigureAwait(true);

    var httpClient = new HttpClient
    {
        BaseAddress = new Uri("https://management.azure.com/subscriptions/")
    };
    string URI = $"{SubscriptionID}/providers/Microsoft.Authorization/roleAssignments?" +
        $"api-version=2018-09-01-preview";
    httpClient.DefaultRequestHeaders.Remove("Authorization");
    httpClient.DefaultRequestHeaders.Add("Authorization","Bearer " + AzToken);
    HttpResponseMessage response = await httpClient.GetAsync(URI).ConfigureAwait(false);

    var HttpsResponse = await response.Content.ReadAsstringAsync();
    dynamic Result = JsonConvert.DeserializeObject<object>(HttpsResponse);

    if (Result["value"] != null)
    {
        foreach (dynamic item in Result["value"])
        {
            string ObjectID = item.properties.principalId;
            string ObjectType = item.properties.principalType;
            string ObjectCategory = null;

            if (ObjectType.Equals("ServicePrincipal",StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "servicePrincipals";
            }

            else if (ObjectType.Equals("User",StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "users";
            }

            else if (ObjectType.Equals("Group",StringComparison.OrdinalIgnoreCase))
            {
                ObjectCategory = "groups";
            }

            string AccessScope = item.properties.scope;
            string displayName = await Helper.GetdisplayName(TenantID,ObjectID,MSGraphToken,ObjectCategory);
            string RoleDeFinitionIDFullText = item.properties.roleDeFinitionId;
            string RoleDeFinitionID = RoleDeFinitionIDFullText.Substring(
                RoleDeFinitionIDFullText.Length - 36);
            string AccessLevel = await Helper.GetRoleDeFinitionName(RoleDeFinitionID,SubscriptionID,AzToken);
            string ProvisionedByObjID = item.properties.createdBy;
            string ProvisionedBy = await Helper.GetdisplayName(TenantID,ProvisionedByObjID,"Decide");
            string ProvisionedAt = item.properties.createdOn;
            string Compliant = Helper.ComplianceValidation(SOX,EnvironmentScope,displayName,ObjectType,AccessLevel,ProvisionedBy,AccessScope);
            RawDate = DateTime.Now;
            RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
            string Action = null;
            if (Compliant.Equals("Yes"))
            {
                Action = "No Action Needed";
            }
            else if (Compliant.Equals("No"))
            {
                Action = "Revoke";
            }
            else
            {
                Action = "Yet To Decide";
            }

            Console.WriteLine($"{ServiceGroupName},{TeamGroupName},{ServiceName}," +
                $"{ServiceTreeID},{Level},{SOX},{SubscriptionName},{SubscriptionID}," +
                $"{EnvironmentScope},{Tenant},{displayName},{ObjectType},{ObjectID}," +
                $"{AccessLevel},{ProvisionedBy},{ProvisionedAt},{AccessScope}," +
                $"{Compliant},{Action},{ProvisionedByObjID},{RefreshedAt}"); ;

            sqlStatement = ($"INSERT INTO RBACStaging Select '{ServiceGroupName}'," +
                $"'{TeamGroupName}','{ServiceName}','{ServiceTreeID}','{Level}','{SOX}'" +
                $",'{SubscriptionName}','{EnvironmentScope}'," +
                $"'{Tenant}','{displayName}','{ObjectType}','{ObjectID}','{AccessLevel}'," +
                $"'{ProvisionedBy}','{ProvisionedAt}','{AccessScope}','{Compliant}'," +
                $"'{Action}','{ProvisionedByObjID}','{RefreshedAt}'");
            try
            {
                Helper.ExecuteTsql(sqlServerName,sqlStatement);
            }
            catch (System.Data.sqlClient.sqlException sqlException)
            {
                Console.WriteLine(sqlException.Message);
            }
        }
    } // End of IF Condition to check Null value from Rest API Calls.
    RawDate = DateTime.Now;
    RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
    sqlStatement = ($"update ResourceStatus Set Status = 'Completed'," +
        $"LastUpdatedTime = '{RefreshedAt}' where ResourceType = 'RBAC' and " +
        $"SubscriptionID = '{SubscriptionID}'");
    Helper.ExecuteTsql(sqlServerName,sqlStatement);
} // End of Method

例外:

TaskCanceledException: A task was canceled
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout,CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive,Int32 toExclusive,ParallelOptions parallelOptions,Action`1 body,Action`2 bodyWithState,Func`4 bodyWithLocal,Func`1 localInit,Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList`1 list,Action`3 bodyWithStateAndindex,Func`4 bodyWithStateAndLocal,Func`5 bodyWithEverything,TLocal](IEnumerable`1 source,Action`1 localFinally)
   at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source,Action`1 body)
   at CPXFundamentals.Program.Main() in C:\Users\venkatag\source\repos\CPXFundamentals\Program.cs:line 47

解决方法

如果您无法访问 TaskCanceledException 的实现,您可能需要考虑只捕获任何 GetRBACSnapshot

例如,您可以捕获 TaskCanceledException 并将该订阅添加回队列以供稍后重试。

请确保不要消耗所有错误,而仅消耗 TaskCanceledException。您应该避免无意中消耗严重的运行时错误。

类似的东西可能对你有用:

int N = 1;
Parallel.ForEach(Subscriptions,s =>
{
    string SubscriptionID = s.Replace(" ",String.Empty);
    Console.WriteLine($"Working on {N}. {SubscriptionID}");
    
    try
    {
        RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
        Interlocked.Increment(ref N);
    }
    catch(TaskCanceledException)
    {
        Console.WriteLine($"{SubscriptionID} Timed out");
        // do something else like add it back to a queue to be tried again
        // just make sure any collection you add the subscription to is thread safe
    }
});