问题描述
我现在有什么?
目前,我有一个配置了 RetryAsync
策略的客户端,该策略使用主地址并在故障时切换到故障转移地址。从机密管理器读取连接详细信息。
services
.AddHttpClient ("MyClient",client => client.BaseAddress = PlaceholderUri)
.ConfigureHttpMessageHandlerBuilder (builder => {
// loads settings from secret manager
var settings = configLoader.LoadSettings().Result;
builder.PrimaryHandler = new httpclienthandler {
Credentials = new NetworkCredential (settings.Username,settings.Password),AutomaticDecompression = DecompressionMethods.GZip
};
var primaryBaseAddress = new Uri (settings.Host);
var failoverBaseAddress = new Uri (settings.DrHost);
builder.AdditionalHandlers.Add (new PolicyHttpMessageHandler (requestMessage => {
var relativeAddress = PlaceholderUri.MakeRelativeUri (requestMessage.RequestUri);
requestMessage.RequestUri = new Uri (primaryBaseAddress,relativeAddress);
return HttpPolicyExtensions.HandleTransientHttpError ()
.RetryAsync ((result,retryCount) =>
requestMessage.RequestUri = new Uri (failoverBaseAddress,relativeAddress));
}));
});
我想达到什么目的?
一般情况
我的客户可以使用主服务或故障转移服务。当主节点关闭时,使用故障转移直到主节点恢复正常。当两者都关闭时,我们会收到警报,并且可以通过机密管理器动态更改服务地址。
在代码中
现在我还想介绍一个 CircuitBreakerPolicy
并将这 2 个策略链接在一起。我正在寻找一种封装的配置,并且在客户端级别而不是在使用该客户端的类上处理故障。
场景说明
假设有一个断路器策略包含在单个客户端的重试策略中。
断路器配置为在 3 次尝试失败后暂时性错误断开电路60 秒在主基地址上。 OnBreak
- 地址从主要更改为故障转移。
重试策略配置为处理brokenCircuitException
,并重试一次,地址从主要更改为故障转移以继续.
- 请求主要地址 - 500 代码
- 请求主要地址 - 500 代码
- 对主地址的请求 - 500 代码(连续 3 次失败)
- 电路中断 60 秒
- 对主地址的请求 -
brokenCircuitException
被重试策略捕获,调用故障转移 - 对主地址的请求 -
brokenCircuitException
被重试策略捕获,调用故障转移 - 对主地址的请求 -
brokenCircuitException
被重试策略捕获,调用故障转移 - 对主地址的请求 -
brokenCircuitException
被重试策略捕获,调用故障转移 - (60 秒后)电路半开 -(这里可以再断开 60 秒或打开 - 假设打开)
- 请求主要地址 - 200 代码
如本 articles 中所述,有一个使用包含在回退中的断路器的解决方案,但正如您在那里看到的,默认和回退的逻辑是在类中实现的,而不是在客户端级别实现的。
我愿意
public class OpenExchangeRatesClient
{
private readonly HttpClient _client;
private readonly Policy _policy;
public OpenExchangeRatesClient(string apiUrl)
{
_client = new HttpClient
{
BaseAddress = new Uri(apiUrl),};
var circuitBreaker = Policy
.Handle<Exception>()
.CircuitBreakerAsync(
exceptionsAllowedBeforeBreaking: 2,durationOfBreak: TimeSpan.FromMinutes(1)
);
_policy = Policy
.Handle<Exception>()
.FallbackAsync(() => GetFallbackRates())
.Wrap(circuitBreaker);
}
public Task<ExchangeRates> GetLatestRates()
{
return _policy
.ExecuteAsync(() => CallRatesApi());
}
public Task<ExchangeRates> CallRatesApi()
{
//call the API,parse the results
}
public Task<ExchangeRates> GetFallbackRates()
{
// load the rates from the embedded file and parse them
}
}
改写为
public class OpenExchangeRatesClient
{
private readonly HttpClient _client;
public OpenExchangeRatesClient (IHttpClientFactory clientFactory) {
_client = clientFactory.CreateClient ("MyClient");
}
public Task<ExchangeRates> GetLatestRates () {
return _client.GetAsync ("/rates-gbp-usd");
}
}
我读了什么?
- 如何CircutBreakerWorks以及有什么用
- 可以包装策略,并且有一个 recommender order 用于包装
- Microsoft's example of Circuit breaker
- Other example of Circuit breaker
- Fallback Policy
我尝试了什么?
我尝试了几种不同的场景来链接和组合断路器策略与重试策略,以在启动文件中的客户端杠杆上实现所需的目标。最后一个状态如下。这些策略按照重试能够捕获 brokenCircuitException
的顺序进行包装,但事实并非如此。异常是在消费者类上抛出的,这不是想要的结果。虽然触发了RetryPolicy
,但是还是抛出了消费者类的异常。
var retryPolicy = GetRetryPolicy();
var circuitBreaker = GetCircuitBreakerPolicy();
var policyWraper = Policy.WrapAsync(retryPolicy,circuitBreaker);
services
.AddHttpClient("TestClient",client => client.BaseAddress = GetPrimaryUri())
.AddPolicyHandler(policyWraper);
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(
3,TimeSpan.FromSeconds(45),OnBreak,OnReset,OnHalfOpen);
}
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return Policy<HttpResponseMessage>
.Handle<Exception>()
.RetryAsync(1,(response,retryCount) =>
{
Debug.WriteLine("Retries on broken circuit");
});
}
我省略了方法 OnBreak
、OnReset
和 OnHalfOpen
,因为它们只是打印一些消息。
更新:添加了来自控制台的日志。
Circuit broken (after 3 attempts)
Retries on broken
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll
Retries on broken circuit
Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll
'CircuitBreakerPolicy.exe' (CoreCLR: clrhost): 加载 'C:\Program 断路重试 抛出异常:System.Private.CoreLib.dll 中的“System.AggregateException”
更新 2:为使用配置了策略的客户端的类添加了参考 URL
更新 3:project 已更新,因此 WeatherService2.Get
的实现以所需的方式工作:当主服务不可用时,电路中断,使用故障转移服务,直到电路可用关闭。这将是这个问题的答案,但是我想探索一种解决方案,在该解决方案中,使用 WeatherService.Get
以及 Startup
上的适当策略和客户端设置可以实现相同的结果。 >
在上面的日志中可以看到断路器抛出的 Exception thrown: 'System.AggregateException' in System.Private.CoreLib.dll
- 这是意料之中的,因为有重试包裹断路器。
解决方法
我已经下载了您的项目并试用了它,以下是我的观察:
阻塞与非阻塞
- 因为您的代码使用了阻塞异步调用 (
.Result
),所以您会看到AggregateException
public IEnumerable<WeatherForecast> Get()
{
HttpResponseMessage response = null;
try
{
response = _client.GetAsync(string.Empty).Result; //AggregateException
}
catch (Exception e)
{
Debug.WriteLine($"{e.Message}");
}
...
}
- 为了解开
InnerException
的AggregateException
,您需要使用await
public async Task<IEnumerable<WeatherForecast>> Get()
{
HttpResponseMessage response = null;
try
{
response = await _client.GetAsync(string.Empty); //BrokenCircuitException
}
catch (Exception e)
{
Debug.WriteLine($"{e.Message}");
}
...
}
升级
每当您将一项政策包装到另一个政策中时,都可能会发生升级。这意味着如果内部无法处理问题,那么它会将相同的问题传播到外部,后者可能会也可能无法处理。如果最外层没有处理问题,那么(大多数情况下)原始异常将被抛给弹性策略(策略组合)的使用者。
您可以在此处找到有关 escalation 的更多详细信息。
让我们在您的案例中回顾一下这个概念:
var policyWrapper = Policy.WrapAsync(retryPolicy,circuitBreaker);
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(3,TimeSpan.FromSeconds(45),...);
}
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return Policy<HttpResponseMessage>
.Handle<Exception>()
.RetryAsync(1,...);
}
- 针对
https://httpstat.us/500
发出初始请求(1. 尝试) - 返回 500,这会将连续瞬态故障从 0 增加到 1
- CB 将问题上报以重试
- 重试未处理状态 500,因此不会触发重试
- httpClient 返回带有
HttpResponseMessage
状态代码的InternalServerError
。
让我们修改重试策略以处理瞬态 http 错误:
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(3,...);
}
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.Or<Exception>()
.RetryAsync(1,...);
}
- 针对
https://httpstat.us/500
发出初始请求(1. 尝试) - 返回 500,这会将连续瞬态故障从 0 增加到 1
- CB 将问题上报以重试
- 重试正在处理状态 500,因此重试会立即发出另一次尝试
- 针对
https://httpstat.us/500
发出第 1 次重试请求(2. 尝试) - 它返回 500,这会将连续瞬态故障从 1 增加到 2
- CB 将问题上报以重试
- 即使重试正在处理状态 500,它也不会触发,因为它达到了重试计数 (1)
- httpClient 返回带有
HttpResponseMessage
状态代码的InternalServerError
。
现在,让我们将连续失败计数从 3 降低到 1 并明确处理 BrokenCircuitException
:
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(1,...);
}
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.Or<BrokenCircuitException>()
.RetryAsync(1,...);
}
- 针对
https://httpstat.us/500
发出初始请求(1. 尝试) - 返回 500,这会将连续瞬态故障从 0 增加到 1
- 断路器打开,因为它达到了预定义的阈值
- CB 将问题上报以重试
- 重试正在处理状态 500,因此重试会立即发出另一次尝试
- 针对
https://httpstat.us/500
发出第 1 次重试请求(2. 尝试) - CB 阻止此调用,因为它已损坏
- CB 抛出一个
BrokenCircuitException
- 即使 Retry 正在处理
BrokenCircuitException
它也不会触发,因为它达到了它的 retrycount (1) - 重试会抛出原始异常 (
BrokenCircuitException
),因此 httpClient 的GetAsync
会抛出该异常。
最后让我们将 retryCount 从 1 增加到 2:
static IAsyncPolicy<HttpResponseMessage> GetCircuitBreakerPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.CircuitBreakerAsync(1,...);
}
static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.Or<BrokenCircuitException>()
.RetryAsync(2,...);
}
- 针对
https://httpstat.us/500
发出初始请求(1. 尝试) - 返回 500,这会将连续瞬态故障从 0 增加到 1
- 断路器打开,因为它达到了预定义的阈值
- CB 将问题上报以重试
- 重试正在处理状态 500,因此重试会立即发出另一次尝试
- 针对
https://httpstat.us/500
发出第 1 次重试请求(2. 尝试) - CB 阻止此调用,因为它已损坏
- CB 抛出一个
BrokenCircuitException
- 重试正在处理
BrokenCircuitException
并且它没有超过其 retryCount,因此它立即发出另一次尝试 - 针对
https://httpstat.us/500
发出第二次重试请求(3. 尝试) - CB 阻止此调用,因为它已损坏
- CB 抛出一个
BrokenCircuitException
- 即使 Retry 正在处理
BrokenCircuitException
它也不会触发,因为它达到了它的 retrycount (2) - 重试将抛出原始异常 (
BrokenCircuitException
),因此 httpClient 的GetAsync
将抛出该异常。
我希望这个练习能帮助您更好地理解如何创建弹性策略,通过升级问题来组合多个策略。
,我已经查看了您的替代解决方案,该解决方案与我在上一篇博文中讨论的设计问题相同。
public WeatherService2(IHttpClientFactory clientFactory,IEnumerable<IAsyncPolicy<HttpResponseMessage>> policies)
{
_primaryClient = clientFactory.CreateClient("PrimaryClient");
_failoverClient = clientFactory.CreateClient("FailoverClient");
_circuitBreaker = policies.First(p => p.PolicyKey == "CircuitBreaker");
_policy = Policy<HttpResponseMessage>
.Handle<Exception>()
.FallbackAsync(_ => CallFallbackForecastApi())
.WrapAsync(_circuitBreaker);
}
public async Task<string> Get()
{
var response = await _policy.ExecuteAsync(async () => await CallForecastApi());
if (response.IsSuccessStatusCode)
return response.StatusCode.ToString();
response = await CallFallbackForecastApi();
return response.StatusCode.ToString();
}
您的后备策略永远不会被触发。
- HttpClient 收到状态码为 500 的响应
- 断路器中断
- CB 将带有 statusCode 500 的
HttpResponseMessage
传播到外部策略 - 回退不会触发,因为它是为异常设置的
Handle<Exception>()
- 策略返回状态代码为 500 的
HttpResponseMessage
- 您的代码手动检查响应,然后手动调用回退。
如果您将政策更改为:
_policy = Policy
.HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
.Or<Exception>()
.FallbackAsync(_ => CallFallbackForecastApi())
.WrapAsync(_circuitBreaker);
那么就不需要手动回退。
- HttpClient 收到状态码为 500 的响应
- 断路器中断
- CB 将带有 statusCode 500 的
HttpResponseMessage
传播到外部策略 - 回退触发,因为它也设置为不成功的状态代码
- HttpClient 收到 statusCode 200 的响应
- 策略返回状态代码为 500 的
HttpResponseMessage
您还需要了解一件更重要的事情。前面的代码仅有效,因为您已在没有断路器策略的情况下注册了 HttpClient。
这意味着 CB 未附加到 HttpClient。因此,如果您像这样更改代码:
public async Task<HttpResponseMessage> CallForecastApi()
=> await _primaryClient.GetAsync("https://httpstat.us/500/");
public async Task<HttpResponseMessage> CallFallbackForecastApi()
=> await _primaryClient.GetAsync("https://httpstat.us/200/");
那么即使第一次尝试后断路器将打开,CallFallbackForecastApi
也不会抛出 BrokenCircuitException
。
但是,如果您像这样将 CB 附加到 HttpClient:
services
.AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
...
.AddPolicyHandler(GetCircuitBreakerPolicy());
然后像这样简化 WeatherService2
:
private readonly HttpClient _primaryClient;
private readonly IAsyncPolicy<HttpResponseMessage> _policy;
public WeatherService2(IHttpClientFactory clientFactory)
{
_primaryClient = clientFactory.CreateClient("PrimaryClient");
_policy = Policy
.HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
.Or<Exception>()
.FallbackAsync(_ => CallFallbackForecastApi());
}
然后它会以 BrokenCircuitException
惨遭失败。
如果您的 WeatherService2
如下所示:
public class WeatherService2 : IWeatherService2
{
private readonly HttpClient _primaryClient;
private readonly HttpClient _secondaryClient;
private readonly IAsyncPolicy<HttpResponseMessage> _policy;
public WeatherService2(IHttpClientFactory clientFactory)
{
_primaryClient = clientFactory.CreateClient("PrimaryClient");
_secondaryClient = clientFactory.CreateClient("FailoverClient");
_policy = Policy
.HandleResult<HttpResponseMessage>(response => response != null && !response.IsSuccessStatusCode)
.Or<Exception>()
.FallbackAsync(_ => CallFallbackForecastApi());
}
public async Task<string> Get()
{
var response = await _policy.ExecuteAsync(async () => await CallForecastApi());
return response.StatusCode.ToString();
}
public async Task<HttpResponseMessage> CallForecastApi()
=> await _primaryClient.GetAsync("https://httpstat.us/500/");
public async Task<HttpResponseMessage> CallFallbackForecastApi()
=> await _secondaryClient.GetAsync("https://httpstat.us/200/");
}
那么它可以正常工作只有PrimaryClient
和FailoverClient
具有不同的断路器。
services
.AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
...
.AddPolicyHandler(GetCircuitBreakerPolicy());
services
.AddHttpClient("FailoverClient",client => client.BaseAddress = PlaceholderUri)
...
.AddPolicyHandler(GetCircuitBreakerPolicy());
如果他们共享同一个断路器,那么第二次调用将再次失败并显示 BrokenCircuitException
。
var cbPolicy = GetCircuitBreakerPolicy();
services
.AddHttpClient("PrimaryClient",client => client.BaseAddress = PlaceholderUri)
...
.AddPolicyHandler(cbPolicy);
services
.AddHttpClient("FailoverClient",client => client.BaseAddress = PlaceholderUri)
...
.AddPolicyHandler(cbPolicy);