问题描述
我在 C# 中运行 Parallel.ForEach
。我低于TaskCanceledException: A task was canceled
。任何人都可以建议我在哪里可以解决我的任务取消问题,或增加我的连接超时时间?
我的代码:
private static void Main()
{
var sqlServerName = "XXXXXXXXXX.database.windows.net";
var sqlServerAdmin = "XXXXXXXXXX";
var sqlServerAdminPasword = "XXXXXXXXXX";
var DatabaseName = "XXXXXXXXXX";
string sqlStatement = ($"DELETE FROM ResourceStatus WHERE ResourceType = 'RBAC' " +
$"TruncATE TABLE RBACStaging ");
Helper.ExecuteTsql(sqlServerName,sqlServerAdmin,sqlServerAdminPasword,DatabaseName,sqlStatement);
sqlConnectionStringBuilder builder = new sqlConnectionStringBuilder();
builder.DataSource = sqlServerName;
builder.UserID = sqlServerAdmin;
builder.Password = sqlServerAdminPasword;
builder.InitialCatalog = DatabaseName;
List<string> Subscriptions = new List<string>();
using (sqlConnection connection = new sqlConnection(builder.ConnectionString))
{
connection.open();
StringBuilder sb = new StringBuilder();
sb.Append($"SELECT SubscriptionID from Subscriptions " +
$"where SOX = 'SR0' and Environment = 'Prod'");
string sql = sb.ToString();
using (sqlCommand command = new sqlCommand(sql,connection))
{
using (sqlDataReader Reader = command.ExecuteReader())
{
while (Reader.Read())
{
Subscriptions.Add(Reader.GetString(0).ToString());
}
connection.Close();
sqlConnection.ClearPool(connection);
}
}
}
Parallel.ForEach(Subscriptions,s =>
{
string SubscriptionID = s.Replace(" ",String.Empty);
RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
});
}
GetRBACSnapshot
方法:
public static async Task GetRBACSnapshot(string SubscriptionID)
{
var ClientID = "de9f7784-93e4-42d0-a68d-7f1457ce4e56";
var AppKey = "XXXXXXXX";
var sqlServerName = "XXXXXXXX.database.windows.net";
var sqlServerAdmin = "XXXXXXXX";
var sqlServerAdminPasword = "XXXXXXXX";
var DatabaseName = "XXXXXXXX";
string TenantID = null;
string ServiceGroupName = null;
string TeamGroupName = null;
string ServiceName = null;
string ServiceTreeID = null;
string Level = null;
string SOX = null;
string SubscriptionName = null;
string EnvironmentScope = null;
string Tenant = null;
DateTime RawDate = DateTime.Now;
string RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
string sqlStatement = null;
sqlStatement = ($"INSERT INTO ResourceStatus " +
$"SELECT 'RBAC','{SubscriptionID}','Started','{RefreshedAt}'");
Helper.ExecuteTsql(sqlServerName,sqlStatement);
sqlConnectionStringBuilder builder = new sqlConnectionStringBuilder();
builder.DataSource = sqlServerName;
builder.UserID = sqlServerAdmin;
builder.Password = sqlServerAdminPasword;
builder.InitialCatalog = DatabaseName;
using (sqlConnection connection = new sqlConnection(builder.ConnectionString))
{
connection.open();
StringBuilder sb = new StringBuilder();
sb.Append($"SELECT ServiceGroupName,TeamGroupName,ServiceName," +
$"ServiceTreeID,Level,SOX,SubscriptionName,Environment,Tenant " +
$"from Subscriptions where SubscriptionID = '{SubscriptionID}'");
string sql = sb.ToString();
using (sqlCommand command = new sqlCommand(sql,connection))
{
using (sqlDataReader Reader = command.ExecuteReader())
{
while (Reader.Read())
{
ServiceGroupName = Reader.GetString(0).ToString();
TeamGroupName = Reader.GetString(1).ToString();
ServiceName = Reader.GetString(2).ToString();
ServiceTreeID = Reader.GetString(3).ToString();
Level = Reader.GetString(4).ToString();
SOX = Reader.GetString(5).ToString();
SubscriptionName = Reader.GetString(6).ToString();
EnvironmentScope = Reader.GetString(7).ToString();
Tenant = Reader.GetString(8).ToString();
}
}
}
connection.Close();
sqlConnection.ClearPool(connection);
}
// AARE Logic
if (Tenant.Equals("AME",StringComparison.OrdinalIgnoreCase))
{
TenantID = "33e01921-4d64-4f8c-a055-5bdaffd5e33d";
}
else if (Tenant.Equals("GME",StringComparison.OrdinalIgnoreCase))
{
TenantID = "124edf19-b350-4797-aefc-3206115ffdb3";
}
else if (Tenant.Equals("PME",StringComparison.OrdinalIgnoreCase))
{
TenantID = "975f013f-7f24-47e8-a7d3-abc4752bf346";
}
else if (Tenant.Equals("Corp",StringComparison.OrdinalIgnoreCase))
{
TenantID = "72f988bf-86f1-41af-91ab-2d7cd011db47";
}
string AzToken = await Helper.GetAccesstokenAsync(TenantID,ClientID,AppKey);
string MSGraphToken = await Helper.GetGraphAccesstoken(TenantID,AppKey)
.ConfigureAwait(true);
var httpClient = new HttpClient
{
BaseAddress = new Uri("https://management.azure.com/subscriptions/")
};
string URI = $"{SubscriptionID}/providers/Microsoft.Authorization/roleAssignments?" +
$"api-version=2018-09-01-preview";
httpClient.DefaultRequestHeaders.Remove("Authorization");
httpClient.DefaultRequestHeaders.Add("Authorization","Bearer " + AzToken);
HttpResponseMessage response = await httpClient.GetAsync(URI).ConfigureAwait(false);
var HttpsResponse = await response.Content.ReadAsstringAsync();
dynamic Result = JsonConvert.DeserializeObject<object>(HttpsResponse);
if (Result["value"] != null)
{
foreach (dynamic item in Result["value"])
{
string ObjectID = item.properties.principalId;
string ObjectType = item.properties.principalType;
string ObjectCategory = null;
if (ObjectType.Equals("ServicePrincipal",StringComparison.OrdinalIgnoreCase))
{
ObjectCategory = "servicePrincipals";
}
else if (ObjectType.Equals("User",StringComparison.OrdinalIgnoreCase))
{
ObjectCategory = "users";
}
else if (ObjectType.Equals("Group",StringComparison.OrdinalIgnoreCase))
{
ObjectCategory = "groups";
}
string AccessScope = item.properties.scope;
string displayName = await Helper.GetdisplayName(TenantID,ObjectID,MSGraphToken,ObjectCategory);
string RoleDeFinitionIDFullText = item.properties.roleDeFinitionId;
string RoleDeFinitionID = RoleDeFinitionIDFullText.Substring(
RoleDeFinitionIDFullText.Length - 36);
string AccessLevel = await Helper.GetRoleDeFinitionName(RoleDeFinitionID,SubscriptionID,AzToken);
string ProvisionedByObjID = item.properties.createdBy;
string ProvisionedBy = await Helper.GetdisplayName(TenantID,ProvisionedByObjID,"Decide");
string ProvisionedAt = item.properties.createdOn;
string Compliant = Helper.ComplianceValidation(SOX,EnvironmentScope,displayName,ObjectType,AccessLevel,ProvisionedBy,AccessScope);
RawDate = DateTime.Now;
RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
string Action = null;
if (Compliant.Equals("Yes"))
{
Action = "No Action Needed";
}
else if (Compliant.Equals("No"))
{
Action = "Revoke";
}
else
{
Action = "Yet To Decide";
}
Console.WriteLine($"{ServiceGroupName},{TeamGroupName},{ServiceName}," +
$"{ServiceTreeID},{Level},{SOX},{SubscriptionName},{SubscriptionID}," +
$"{EnvironmentScope},{Tenant},{displayName},{ObjectType},{ObjectID}," +
$"{AccessLevel},{ProvisionedBy},{ProvisionedAt},{AccessScope}," +
$"{Compliant},{Action},{ProvisionedByObjID},{RefreshedAt}"); ;
sqlStatement = ($"INSERT INTO RBACStaging Select '{ServiceGroupName}'," +
$"'{TeamGroupName}','{ServiceName}','{ServiceTreeID}','{Level}','{SOX}'" +
$",'{SubscriptionName}','{EnvironmentScope}'," +
$"'{Tenant}','{displayName}','{ObjectType}','{ObjectID}','{AccessLevel}'," +
$"'{ProvisionedBy}','{ProvisionedAt}','{AccessScope}','{Compliant}'," +
$"'{Action}','{ProvisionedByObjID}','{RefreshedAt}'");
try
{
Helper.ExecuteTsql(sqlServerName,sqlStatement);
}
catch (System.Data.sqlClient.sqlException sqlException)
{
Console.WriteLine(sqlException.Message);
}
}
} // End of IF Condition to check Null value from Rest API Calls.
RawDate = DateTime.Now;
RefreshedAt = RawDate.ToString("yyyy-MM-dd HH:mm:ss.fff");
sqlStatement = ($"update ResourceStatus Set Status = 'Completed'," +
$"LastUpdatedTime = '{RefreshedAt}' where ResourceType = 'RBAC' and " +
$"SubscriptionID = '{SubscriptionID}'");
Helper.ExecuteTsql(sqlServerName,sqlStatement);
} // End of Method
例外:
TaskCanceledException: A task was canceled
at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout,CancellationToken cancellationToken)
at System.Threading.Tasks.Task.Wait()
at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive,Int32 toExclusive,ParallelOptions parallelOptions,Action`1 body,Action`2 bodyWithState,Func`4 bodyWithLocal,Func`1 localInit,Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEachWorker[TSource,TLocal](IList`1 list,Action`3 bodyWithStateAndindex,Func`4 bodyWithStateAndLocal,Func`5 bodyWithEverything,TLocal](IEnumerable`1 source,Action`1 localFinally)
at System.Threading.Tasks.Parallel.ForEach[TSource](IEnumerable`1 source,Action`1 body)
at CPXFundamentals.Program.Main() in C:\Users\venkatag\source\repos\CPXFundamentals\Program.cs:line 47
解决方法
如果您无法访问 TaskCanceledException
的实现,您可能需要考虑只捕获任何 GetRBACSnapshot
。
例如,您可以捕获 TaskCanceledException
并将该订阅添加回队列以供稍后重试。
请确保不要消耗所有错误,而仅消耗 TaskCanceledException
。您应该避免无意中消耗严重的运行时错误。
类似的东西可能对你有用:
int N = 1;
Parallel.ForEach(Subscriptions,s =>
{
string SubscriptionID = s.Replace(" ",String.Empty);
Console.WriteLine($"Working on {N}. {SubscriptionID}");
try
{
RBACSnapshot.GetRBACSnapshot(SubscriptionID).Wait();
Interlocked.Increment(ref N);
}
catch(TaskCanceledException)
{
Console.WriteLine($"{SubscriptionID} Timed out");
// do something else like add it back to a queue to be tried again
// just make sure any collection you add the subscription to is thread safe
}
});