问题描述
我是 RabbitMQ 的新手,我使用 Publisher Confirm 来确保消息成功传递。
但是我面临着一个奇怪的行为,那就是当我发布一条消息并获取一个 sequenceNumber 函数通道时。BasicAcks 被触发 n 次,其中 n 是 DeliveryTag,所以如果此消息的 DeliveryTag 是 5 函数通道.BasicAcks 被解雇了 5 次?!!
这是我的代码:
IModel channel = _customrabbitMQ.GetChannel();
IBasicProperties properties = _customrabbitMQ.GetBasicProperties();
ulong sequenceNumber = channel.NextPublishSeqNo;
message.SequenceNumber = sequenceNumber.ToString();
_context.Messages.Update(message);
await _context.SaveChangesAsync();
try
{
_customrabbitMQ.AddOutstandingConfirm(sequenceNumber,message.Id);
channel.Basicpublish(message.ExchangeName,message.RoutingKey,properties,Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)));
Console.WriteLine("Message Published");
}
catch (Exception ex)
{
// Todo: log to custom logger here
Console.WriteLine($"Error => {ex.Message}");
}
channel.BasicAcks += async (sender,ea) =>
{
try
{
Console.WriteLine("Message Confirmed");
using var scope = _provider.CreateScope();
var _context = scope.ServiceProvider.GetService<Data.DataContext>();
var _customrabbitMQ = scope.ServiceProvider.GetService<CustomrabbitMQ>();
Guid messageId = _customrabbitMQ.GetoutstandingConfirm(ea.DeliveryTag);
Message message = await _context.Messages.Where(m => m.Id == messageId).FirstOrDefaultAsync();
message.Status = MessageStatuses.INQUEUE;
_context.Messages.Update(message);
await _context.SaveChangesAsync();
_customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
}
catch (Exception ex)
{
Console.WriteLine($"Error => {ex.Message}");
}
};
channel.BasicNacks += (sender,ea) =>
{
Console.WriteLine("Message Not Confirmed");
_customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
};
}
那么,为什么会发生这种情况以及如何阻止它并使其仅确认一次?
提前致谢。
解决方法
我认为问题在于每次发布消息时都会初始化一个侦听器。
因此,当您使用 DT = 1
发布消息时,您通过添加 channel.BasicAck += fun
创建了一个侦听器,而当使用 DT = 2
添加另一条消息时,您创建了另一个侦听器,因此当 DT_2 上的确认返回时将被推送给 2 个侦听器,这就解释了为什么您会收到具有相同数量的交付标签的确认。