如何对IEnumerable的所有元素执行Polly策略,并在出现第一个未处理的异常时停止

问题描述

Polly库的策略,例如BulkheadRetry等,包含具有许多重载(18)的方法ExecuteAsync,但是它们都不允许对IEnumerable的所有元素执行策略并收集结果。似乎整个库都集中在执行单个动作的目标上,而将管理多个执行的责任留给了客户端代码。我想通过对所有Polly策略(IAsyncPolicy接口的所有实现)实施扩展方法解决此遗漏,并使用以下签名:

public static Task<TResult[]> ExecuteAsync<TSource,TResult>(
    this IAsyncPolicy policy,IEnumerable<TSource> source,Func<TSource,Task<TResult>> action,bool continueOnCapturedContext = false,bool onErrorContinue = false)

continueOnCapturedContext参数控制是否在捕获的同步上下文上继续,应该只传递 到原生ExecuteAsync方法

Task<TResult> IAsyncPolicy.ExecuteAsync<TResult>(
    Func<CancellationToken,CancellationToken cancellationToken,bool continueOnCapturedContext);

onErrorContinue参数是此问题的最重要方面,因为它控制了策略失败时的行为。我的意图是将这种扩展方法与数千个元素一起使用,并且在我的策略¹未预期/无法处理的任何例外情况下,我想迅速而优雅地终止整个执行过程。如果参数onErrorContinue具有认值false,则第一个未处理的异常将导致取消所有挂起的操作,并且所有开始的操作完成后,整个执行应终止。在onErrorContinue: true的相反情况下,所有元素应由策略处理。最后,应该传播所有异常,并将其捆绑在AggregateException中,与onErrorContinue值无关。

如何实现此扩展方法

方法的假设使用场景:

var policy = Policy
    .BulkheadAsync(maxParallelization: 10,maxQueuingActions: Int32.MaxValue)
    .WrapAsync(Policy
        .Handle<HttpRequestException>()
        .WaitAndRetryAsync(retryCount: 3,sleepDurationProvider: n => TimeSpan.FromMilliseconds(1000 * n))
    );

var urls = Enumerable.Range(1,1000).Select(n => n.ToString());
var random = new Random(0);
string[] results = await policy.ExecuteAsync(urls,async url =>
{
    await Task.Delay(500); // Simulate a web request
    lock (random) if (random.NextDouble() < 0.66)
        throw new HttpRequestException($"Url #{url} Failed");
    return url;
},onErrorContinue: false);

¹这种情况很少在生产中发生,但可能在开发过程中频繁发生,并且可能会损害生产率。

解决方法

这是我对using System; namespace ConsoleApp1 { abstract class Vehicle { public abstract string Name { get; } public override string ToString() => $"This vehicle is a {Name}."; // To keep the output identical to your example,use this method // public override string ToString() => $"We don't know what kind of vehicle this is."; // You then have to override ToString in Sedan,too. But I think there's no reason // why you'd want to have to override ToString everywhere with the same code. } internal class Sedan : Vehicle { public override string Name => "Sedan"; } abstract class AbstractLoad { public abstract string Name { get; } public override string ToString() => Name; } internal class Oil : AbstractLoad { public override string Name => "Oil"; } internal class Lumber : AbstractLoad { public override string Name => "Lumber"; } class Semi<TLoad> : Vehicle where TLoad : AbstractLoad { public override string Name => "Semi"; public TLoad Load { get; set; } public override string ToString() => $"This vehicle is a semi carrying {Load}."; } class Program { static void Main(string[] args) { var vehicle = new Semi<Oil> { Load = new Oil() }; DescribeVehicle(vehicle); } static void DescribeVehicle(Vehicle vehicle) { Console.WriteLine(vehicle.ToString()); } } } 方法的实现。 ExecuteAsync用于在发生异常的情况下取消挂起的操作。当有更重要的例外要传播时,CancellationTokenSource可以很好地忽略Task.WhenAll。最终,OperationCanceledException任务没有进行Task.WhenAll返回,以保留所有异常。

await

模拟public static Task<TResult[]> ExecuteAsync<TSource,TResult>( this IAsyncPolicy policy,IEnumerable<TSource> source,Func<TSource,Task<TResult>> action,bool continueOnCapturedContext = false,bool onErrorContinue = false) { // Arguments validation omitted var cts = new CancellationTokenSource(); var token = !onErrorContinue ? cts.Token : default; var tasks = source.Select(async (item) => { try { return await policy.ExecuteAsync(async _ => { return await action(item); },token,continueOnCapturedContext); } catch { if (!onErrorContinue) cts.Cancel(); throw; } }).ToArray(); var whenAll = Task.WhenAll(tasks); _ = whenAll.ContinueWith(_ => cts.Dispose(),TaskScheduler.Default); return whenAll; } 行为(在这种情况下是理想的),否则为is quite tricky(通过异步/等待)。因此,我很高兴通过使用小型dirty Task.WhenAll来避免这种麻烦,以便最终dispose ContinueWith

here介绍了处理多个异常的另一种方法。此解决方案传播了一个嵌套的CancellationTokenSource,听起来很丑陋,但实际上是可以的,因为AggregateException使用异步方法无论如何都消除了一层嵌套。