Polly CircuitBreaker 在断路时更改 HttpClient 基地址以继续执行请求 我现在有什么?一般情况在代码中场景说明

问题描述

我现在有什么?

目前,我有一个配置了 RetryAsync 策略的客户端,该策略使用主地址并在故障时切换到故障转移地址。从机密管理器读取连接详细信息。

services
    .AddHttpClient ("MyClient",client => client.BaseAddress = PlaceholderUri)
    .ConfigureHttpMessageHandlerBuilder (builder => {

        // loads settings from secret manager
        var settings = configLoader.LoadSettings().Result;

        builder.PrimaryHandler = new HttpClientHandler {
            Credentials = new NetworkCredential (settings.Username,settings.Password),AutomaticDecompression = DecompressionMethods.GZip
        };

        var primaryBaseAddress = new Uri (settings.Host);
        var failoverBaseAddress = new Uri (settings.DrHost);

        builder.AdditionalHandlers.Add (new PolicyHttpMessageHandler (requestMessage => {
            var relativeAddress = PlaceholderUri.MakeRelativeUri (requestMessage.RequestUri);
            requestMessage.RequestUri = new Uri (primaryBaseAddress,relativeAddress);

            return HttpPolicyExtensions.HandleTransientHttpError ()
                .RetryAsync ((result,retryCount) =>
                    requestMessage.RequestUri = new Uri (failoverBaseAddress,relativeAddress));
        }));
    });

我想达到什么目的?

一般情况

我的客户可以使用主服务或故障转移服务。当主节点关闭时,使用故障转移直到主节点恢复正常。当两者都关闭时,我们会收到警报,并且可以通过机密管理器动态更改服务地址。

在代码中

现在我还想介绍一个 CircuitBreakerPolicy 并将这 2 个策略链接在一起。我正在寻找一种封装的配置,并且在客户端级别而不是在使用该客户端的类上处理故障。

场景说明

假设有一个断路器策略包含在单个客户端的重试策略中。

断路器配置为在 3 次尝试失败暂时性错误断开电路60 秒在主基地址上。 OnBreak - 地址从主要更改为故障转移。

重试策略配置为处理BrokenCircuitException,并重试一次,地址从主要更改为故障转移以继续.

  1. 请求主要地址 - 500 代码
  2. 请求主要地址 - 500 代码
  3. 对主地址的请求 - 500 代码(连续 3 次失败)
  4. 电路中断 60 秒
  5. 对主地址的请求 - BrokenCircuitException 被重试策略捕获,调用故障转移
  6. 对主地址的请求 - BrokenCircuitException 被重试策略捕获,调用故障转移
  7. 对主地址的请求 - BrokenCircuitException 被重试策略捕获,调用故障转移
  8. 对主地址的请求 - BrokenCircuitException 被重试策略捕获,调用故障转移
  9. (60 秒后)电路半开 -(这里可以再断开 60 秒或打开 - 假设打开)
  10. 请求主要地址 - 200 代码

如本 articles 中所述,有一个使用包含在回退中的断路器的解决方案,但正如您在那里看到的,默认和回退的逻辑是在类中实现的,而不是在客户端级别实现的。

我愿意

public class OpenExchangeRatesClient
{
    private readonly HttpClient _client;
    private readonly Policy _policy;
    public OpenExchangeRatesClient(string apiUrl)
    {
        _client = new HttpClient
        {
            BaseAddress = new Uri(apiUrl),};

        var circuitBreaker = Policy
            .Handle<Exception>()
            .CircuitBreakerAsync(
                exceptionsAllowedBeforeBreaking: 2,durationOfBreak: TimeSpan.FromMinutes(1)
            );

        _policy = Policy
            .Handle<Exception>()
            .FallbackAsync(() => GetFallbackRates())
            .Wrap(circuitBreaker);
    }

    public Task<ExchangeRates> GetLatestRates()
    {
        return _policy
            .ExecuteAsync(() => CallRatesApi());
    }

    public Task<ExchangeRates> CallRatesApi()
    {
        //call the API,parse the results
    }

    public Task<ExchangeRates> GetFallbackRates()
    {
        // load the rates from the embedded file and parse them
    }
}

改写为

public class OpenExchangeRatesClient 
{
    private readonly HttpClient _client;
    public OpenExchangeRatesClient (IHttpClientFactory clientFactory) {
        _client = clientFactory.CreateClient ("MyClient");
    }

    public Task<ExchangeRates> GetLatestRates () {
        return _client.GetAsync ("/rates-gbp-usd");
    }
}

我读了什么?

我尝试了什么?

我尝试了几种不同的场景来链接和组合断路器策略与重试策略,以在启动文件中的客户端杠杆上实现所需的目标。最后一个状态如下。这些策略按照重试能够捕获 BrokenCircuitException 的顺序进行包装,但事实并非如此。异常是在消费者类上抛出的,这不是想要的结果。虽然触发了RetryPolicy,但是还是抛出了消费者类的异常。

var retryPolicy = GetRetryPolicy();
var circuitBreaker = GetCircuitBreakerPolicy();

var policyWraper = Policy.WrapAsync(retryPolicy,circuitBreaker);

services
    .AddHttpClient("TestClient",client => client.BaseAddress = GetPrimaryUri())
    .AddPolicyHandler(policyWraper);

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(
            3,TimeSpan.FromSeconds(45),OnBreak,OnReset,OnHalfOpen);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return Policy<HttpResponseMessage>
        .Handle<Exception>()
        .RetryAsync(1,(response,retryCount) =>
        {
            Debug.WriteLine("Retries on broken circuit");
        });
}

我省略了方法 OnBreakOnResetOnHalfOpen,因为它们只是打印一些消息。

更新:添加了来自控制台的日志。

Circuit broken (after 3 attempts)
Retries on broken
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll 
Retries on broken circuit
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll

'CircuitBreakerPolicy.exe' (CoreCLR: clrhost): 加载 'C:\Program 断路重试 抛出异常:System.Private.CoreLib.dll 中的“System.AggregateException”

更新 2:为使用配置了策略的客户端的类添加了参考 URL

更新 3:project 已更新,因此 WeatherService2.Get 的实现以所需的方式工作:当主服务不可用时,电路中断,使用故障转移服务,直到电路可用关闭。这将是这个问题的答案,但是我想探索一种解决方案,在该解决方案中,使用 WeatherService.Get 以及 Startup 上的适当策略和客户端设置可以实现相同的结果。 >

使用客户端引用 class。 使用类引用 project

在上面的日志中可以看到断路器抛出的 Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll - 这是意料之中的,因为有重试包裹断路器。

解决方法

我已经下载了您的项目并试用了它,以下是我的观察:

阻塞与非阻塞

  • 因为您的代码使用了阻塞异步调用 (.Result),所以您会看到 AggregateException
public IEnumerable<WeatherForecast> Get()
{
    HttpResponseMessage response = null;
    try
    {
        response = _client.GetAsync(string.Empty).Result; //AggregateException  
    }
    catch (Exception e)
    {
        Debug.WriteLine($"{e.Message}");
    }
    ...
}
  • 为了解开 InnerExceptionAggregateException,您需要使用 await
public async Task<IEnumerable<WeatherForecast>> Get()
{
    HttpResponseMessage response = null;
    try
    {
        response = await _client.GetAsync(string.Empty); //BrokenCircuitException
    }
    catch (Exception e)
    {
        Debug.WriteLine($"{e.Message}");
    }
    ...
}

升级

每当您将一项政策包装到另一个政策中时,都可能会发生升级。这意味着如果内部无法处理问题,那么它会将相同的问题传播到外部,后者可能会也可能无法处理。如果最外层没有处理问题,那么(大多数情况下)原始异常将被抛给弹性策略(策略组合)的使用者。

您可以在此处找到有关 escalation 的更多详细信息。

让我们在您的案例中回顾一下这个概念:

var policyWrapper = Policy.WrapAsync(retryPolicy,circuitBreaker);

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(3,TimeSpan.FromSeconds(45),...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return Policy<HttpResponseMessage>
        .Handle<Exception>()
        .RetryAsync(1,...);
}
  1. 针对 https://httpstat.us/500 发出初始请求(1. 尝试)
  2. 返回 500,这会将连续瞬态故障从 0 增加到 1
  3. CB 将问题上报以重试
  4. 重试未处理状态 500,因此不会触发重试
  5. httpClient 返回带有 HttpResponseMessage 状态代码的 InternalServerError

让我们修改重试策略以处理瞬态 http 错误:

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(3,...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<Exception>()
        .RetryAsync(1,...);
}
  1. 针对 https://httpstat.us/500 发出初始请求(1. 尝试)
  2. 返回 500,这会将连续瞬态故障从 0 增加到 1
  3. CB 将问题上报以重试
  4. 重试正在处理状态 500,因此重试会立即发出另一次尝试
  5. 针对 https://httpstat.us/500 发出第 1 次重试请求(2. 尝试)
  6. 它返回 500,这会将连续瞬态故障从 1 增加到 2
  7. CB 将问题上报以重试
  8. 即使重试正在处理状态 500,它也不会触发,因为它达到了重试计数 (1)
  9. httpClient 返回带有 HttpResponseMessage 状态代码的 InternalServerError

现在,让我们将连续失败计数从 3 降低到 1 并明确处理 BrokenCircuitException

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(1,...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<BrokenCircuitException>()
        .RetryAsync(1,...);
}
  1. 针对 https://httpstat.us/500 发出初始请求(1. 尝试)
  2. 返回 500,这会将连续瞬态故障从 0 增加到 1
  3. 断路器打开,因为它达到了预定义的阈值
  4. CB 将问题上报以重试
  5. 重试正在处理状态 500,因此重试会立即发出另一次尝试
  6. 针对 https://httpstat.us/500 发出第 1 次重试请求(2. 尝试)
  7. CB 阻止此调用,因为它已损坏
  8. CB 抛出一个 BrokenCircuitException
  9. 即使 Retry 正在处理 BrokenCircuitException 它也不会触发,因为它达到了它的 retrycount (1)
  10. 重试会抛出原​​始异常 (BrokenCircuitException),因此 httpClient 的 GetAsync 会抛出该异常。

最后让我们将 retryCount 从 1 增加到 2:

static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .CircuitBreakerAsync(1,...);
}

static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
    return HttpPolicyExtensions
        .HandleTransientHttpError()
        .Or<BrokenCircuitException>()
        .RetryAsync(2,...);
}
  1. 针对 https://httpstat.us/500 发出初始请求(1. 尝试)
  2. 返回 500,这会将连续瞬态故障从 0 增加到 1
  3. 断路器打开,因为它达到了预定义的阈值
  4. CB 将问题上报以重试
  5. 重试正在处理状态 500,因此重试会立即发出另一次尝试
  6. 针对 https://httpstat.us/500 发出第 1 次重试请求(2. 尝试)
  7. CB 阻止此调用,因为它已损坏
  8. CB 抛出一个 BrokenCircuitException
  9. 重试正在处理 BrokenCircuitException 并且它没有超过其 retryCount,因此它立即发出另一次尝试
  10. 针对 https://httpstat.us/500 发出第二次重试请求(3. 尝试)
  11. CB 阻止此调用,因为它已损坏
  12. CB 抛出一个 BrokenCircuitException
  13. 即使 Retry 正在处理 BrokenCircuitException 它也不会触发,因为它达到了它的 retrycount (2)
  14. 重试将抛出原始异常 (BrokenCircuitException),因此 httpClient 的 GetAsync 将抛出该异常。

我希望这个练习能帮助您更好地理解如何创建弹性策略,通过升级问题来组合多个策略。

,

我已经查看了您的替代解决方案,该解决方案与我在上一篇博文中讨论的设计问题相同。

public WeatherService2(IHttpClientFactory clientFactory,IEnumerable<IAsyncPolicy<HttpResponseMessage>> policies)
{
    _primaryClient = clientFactory.CreateClient("PrimaryClient");
    _failoverClient = clientFactory.CreateClient("FailoverClient");
    _circuitBreaker = policies.First(p => p.PolicyKey == "CircuitBreaker");

    _policy = Policy<HttpResponseMessage>
        .Handle<Exception>()
        .FallbackAsync(_ => CallFallbackForecastApi())
        .WrapAsync(_circuitBreaker);
}

public async Task<string> Get()
{
    var response = await _policy.ExecuteAsync(async () => await CallForecastApi());

    if (response.IsSuccessStatusCode) 
        return response.StatusCode.ToString();

    response = await CallFallbackForecastApi();
    return response.StatusCode.ToString();
}

您的后备策略永远不会被触发。

  1. HttpClient 收到状态码为 500 的响应
  2. 断路器中断
  3. CB 将带有 statusCode 500 的 HttpResponseMessage 传播到外部策略
  4. 回退不会触发,因为它是为异常设置的 Handle<Exception>()
  5. 策略返回状态代码为 500 的 HttpResponseMessage
  6. 您的代码手动检查响应,然后手动调用回退。

如果您将政策更改为:

_policy = Policy
    .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
    .Or<Exception>()
    .FallbackAsync(_ => CallFallbackForecastApi())
    .WrapAsync(_circuitBreaker);

那么就不需要手动回退。

  1. HttpClient 收到状态码为 500 的响应
  2. 断路器中断
  3. CB 将带有 statusCode 500 的 HttpResponseMessage 传播到外部策略
  4. 回退触发,因为它也设置为不成功的状态代码
  5. HttpClient 收到 statusCode 200 的响应
  6. 策略返回状态代码为 500 的 HttpResponseMessage

您还需要了解一件更重要的事情。前面的代码有效,因为您已在没有断路器策略的情况下注册了 HttpClient。

这意味着 CB 未附加到 HttpClient。因此,如果您像这样更改代码:

public async Task<HttpResponseMessage> CallForecastApi()
    => await _primaryClient.GetAsync("https://httpstat.us/500/");

public async Task<HttpResponseMessage> CallFallbackForecastApi()
    => await _primaryClient.GetAsync("https://httpstat.us/200/");

那么即使第一次尝试后断路器将打开,CallFallbackForecastApi 也不会抛出 BrokenCircuitException

但是,如果您像这样将 CB 附加到 HttpClient:

services
    .AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

然后像这样简化 WeatherService2

private readonly HttpClient _primaryClient;
private readonly IAsyncPolicy<HttpResponseMessage> _policy;

public WeatherService2(IHttpClientFactory clientFactory)
{
    _primaryClient = clientFactory.CreateClient("PrimaryClient");
    _policy = Policy
        .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
        .Or<Exception>()
        .FallbackAsync(_ => CallFallbackForecastApi());
}

然后它会以 BrokenCircuitException 惨遭失败。


如果您的 WeatherService2 如下所示:

public class WeatherService2 : IWeatherService2
{
    private readonly HttpClient _primaryClient;
    private readonly HttpClient _secondaryClient;
    private readonly IAsyncPolicy<HttpResponseMessage> _policy;
    public WeatherService2(IHttpClientFactory clientFactory)
    {
        _primaryClient = clientFactory.CreateClient("PrimaryClient");
        _secondaryClient = clientFactory.CreateClient("FailoverClient");

        _policy = Policy
            .HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
            .Or<Exception>()
            .FallbackAsync(_ => CallFallbackForecastApi());
    }

    public async Task<string> Get()
    {
        var response = await _policy.ExecuteAsync(async () => await CallForecastApi());
        return response.StatusCode.ToString();
    }

    public async Task<HttpResponseMessage> CallForecastApi()
        => await _primaryClient.GetAsync("https://httpstat.us/500/");

    public async Task<HttpResponseMessage> CallFallbackForecastApi()
        => await _secondaryClient.GetAsync("https://httpstat.us/200/");
}

那么它可以正常工作只有PrimaryClientFailoverClient具有不同的断路器

services
    .AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

services
    .AddHttpClient("FailoverClient",client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(GetCircuitBreakerPolicy());

如果他们共享同一个断路器,那么第二次调用将再次失败并显示 BrokenCircuitException

var cbPolicy = GetCircuitBreakerPolicy();

services
    .AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(cbPolicy);

services
    .AddHttpClient("FailoverClient",client => client.BaseAddress = PlaceholderUri)
    ...
    .AddPolicyHandler(cbPolicy);

相关问答

错误1:Request method ‘DELETE‘ not supported 错误还原:...
错误1:启动docker镜像时报错:Error response from daemon:...
错误1:private field ‘xxx‘ is never assigned 按Alt...
报错如下,通过源不能下载,最后警告pip需升级版本 Requirem...