RabbitMQ:发布者确认中的奇怪行为

问题描述

我是 RabbitMQ 的新手,我使用 Publisher Confirm 来确保消息成功传递。

但是我面临着一个奇怪的行为,那就是当我发布一条消息并获取一个 sequenceNumber 函数通道时。BasicAcks 被触发 n 次,其中 n 是 DeliveryTag,所以如果此消息的 DeliveryTag 是 5 函数通道.BasicAcks 被解雇了 5 次?!!

这是我的代码

            IModel channel = _customrabbitMQ.GetChannel();
            IBasicProperties properties = _customrabbitMQ.GetBasicProperties();
            ulong sequenceNumber = channel.NextPublishSeqNo;

         
            message.SequenceNumber = sequenceNumber.ToString();
            _context.Messages.Update(message);
            await _context.SaveChangesAsync();

            try
            {
                _customrabbitMQ.AddOutstandingConfirm(sequenceNumber,message.Id);
                channel.Basicpublish(message.ExchangeName,message.RoutingKey,properties,Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message)));
                Console.WriteLine("Message Published");
            }
            catch (Exception ex)
            {
                // Todo: log to custom logger here
                Console.WriteLine($"Error => {ex.Message}");
            }

            channel.BasicAcks += async (sender,ea) =>
            {
                try
                {
                    Console.WriteLine("Message Confirmed");

                    using var scope = _provider.CreateScope();
                    var _context = scope.ServiceProvider.GetService<Data.DataContext>();
                    var _customrabbitMQ = scope.ServiceProvider.GetService<CustomrabbitMQ>();

                    Guid messageId = _customrabbitMQ.GetoutstandingConfirm(ea.DeliveryTag);

                    Message message = await _context.Messages.Where(m => m.Id == messageId).FirstOrDefaultAsync();
                    message.Status = MessageStatuses.INQUEUE;
                    _context.Messages.Update(message);
                    await _context.SaveChangesAsync();

                    _customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Error => {ex.Message}");
                }
            };

            channel.BasicNacks += (sender,ea) =>
            {
                Console.WriteLine("Message Not Confirmed");
                _customrabbitMQ.RemoveOutstandingConfirm(ea.DeliveryTag);
            };
        }

那么,为什么会发生这种情况以及如何阻止它并使其仅确认一次?

提前致谢。

解决方法

我认为问题在于每次发布消息时都会初始化一个侦听器。 因此,当您使用 DT = 1 发布消息时,您通过添加 channel.BasicAck += fun 创建了一个侦听器,而当使用 DT = 2 添加另一条消息时,您创建了另一个侦听器,因此当 DT_2 上的确认返回时将被推送给 2 个侦听器,这就解释了为什么您会收到具有相同数量的交付标签的确认。