RabbitMQ 捕获队列长度异常

问题描述

背景信息

我有一个包含三个微服务的系统。

一个被称为 powershell 脚本,它通知系统数据已准备好从客户 1 的数据库提取。这被插入到队列 1 中。

当第二个微服务获得准备就绪的信息时,它正在侦听队列一。它连接到数据库并开始用需要发送的数据填充队列 2。

第三个微服务正在列到队列 2 中,并开始将排队的数据发送到第三方网关。 (在我的系统上会有一个限制,我可以向网关发出多少请求,我还没有被告知这是什么,因为网关服务器仍然处于开发阶段。)

我的问题是队列 2 可能由于某些原因而过载

  1. 服务客户立即准备就绪。
  2. 网关关闭,队列未清空。

所以我试图确保如果队列 2 已满,第二个微服务会回到队列 1,它需要重试。我担心的是,如果微服务两个 nack 回到队列一个(不删除它),它会立即再次尝试。我需要在这里实施某种退避。我想知道我是否应该在微服务 2 中实现指数回退,而不是将其回退。但是在某些时候,如果网关关闭了几天,我需要停止它我不确定这是否应该永远继续尝试。

我已经开始执行一些检查,在队列 2 开始加载之前检查它有多少空间。

我以前使用过 RabbitMQ,但我从来没有做过所有设置。

我做了什么

我已将队列配置为 10 个非常低的限制,我正在尝试测试 Polly 重试和断路器。

queue properties

如您所见,其最大长度为 10。

var args = new Dictionary<string,object>
            {
                {"x-max-length",_queueSettings.MaxLength},{"x-overflow","reject-publish"},{"x-max-length-bytes",_queueSettings.MaxLengthInBytes}
            };
using var channel = _connection.CreateModel();
channel.QueueDeclare(queue: _queueSettings.Name,durable: _queueSettings.Durable,exclusive: false,autoDelete: false,arguments: args);

问题是当它运行时它不会失败。它只是没有向队列中添加任何内容,并且所有内容都可能只是被转储到死信中。我需要它返回到我的应用程序,以便我可以将它设置为停止发送(断路器),直到队列被清除,以便我可以再次添加内容。否则我只会失去请求。

不带波莉发送

channel.ConfirmSelect();
// uses a 5 second timeout
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));
            
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2; // persistent

_logger.LogTrace("Publishing report to RabbitMQ: {RecordId}",@jsonReportData.RecordId);

channel.Basicpublish(exchange: "",routingKey: _queueSettings.Name,basicProperties: properties,body: body);

与波莉一起发送

var policy = RetryPolicy
    .Handle<OperationInterruptedException>()
    .WaitAndRetry(1,retryAttempt => TimeSpan.FromSeconds(Math.Pow(2,retryAttempt)),(ex,time) =>
        {
            _logger.LogWarning(ex,"Could not publish RecordId: {RecordId} after {Timeout}s ({ExceptionMessage})",@jsonReportData.RecordId,$"{time.TotalSeconds:n1}",ex.Message);
        });

channel.ConfirmSelect();
// uses a 5 second timeout
channel.WaitForConfirmsOrDie(TimeSpan.FromSeconds(5));

policy.Execute(() =>
{
    var properties = channel.CreateBasicProperties();
    properties.DeliveryMode = 2; // persistent

    _logger.LogTrace("Publishing report to RabbitMQ: {RecordId}",@jsonReportData.RecordId);

    channel.Basicpublish(exchange: "",body: body);
});

我发誓它在我上次尝试添加最大长度时抛出了一个异常,但现在它只是停在 10 并且忽略了其他。

我也尝试添加一个侦听器,但在发布到队列后我没有从 Rabbit 获取任何信息也无济于事。

channel.ConfirmSelect();
channel.BasicAcks += (sender,ea) =>
{
    // code when message is confirmed
    int i = 1;
};
channel.BasicNacks += (sender,ea) =>
{
    int i = 1;
};

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)