为什么Peek和Receive从服务总线队列返回不同的消息?

问题描述

我正在使用Peek()方法从Azure Service Bus队列查看消息。

然后我根据我的应用程序级别条件“延迟”消息。以下是我的操作的摘要

msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name),ReceiveMode.PeekLock);
message = msgClient.Peek(); // Got a message
if (message.MessageId == "xxx")
{
   if(message.State == MessageState.Active)
   {
      brokeredMessage _message = null;
      _message = msgClient.Receive(); // Getting a different message though there are no more than 1 listener :(
      if (_message != null){
         _message.Defer();
         _message.Abandon();
      }
   }
   else
   {
     // Perform some operation with deferred state message
   }
}

此处,msgClient.Peek()和msgClient.Receive()返回不同的消息。因此,我陷入了推迟传递错误信息的境地。我该如何解决

解决方法

最可能的原因是由于多个并发接收者在窥视和接收之间,另一个使用者调用receive()渲染当前的接收来代替接收下一条消息。将单个侦听器与未分区的队列一起使用时,您的代码按我的预期工作。另一个原因可能是队列是“分区的”,可以在其中窥视并接收来自不同片段的邮件。

按如下所示修改您的代码应该可以在并发竞争的使用者场景中解决您的问题。找到内嵌评论进行解释。

var msgClient = clientMessagingFactory.CreateQueueClient(QueueClient.FormatDeadLetterPath(sourceQueue.Name),ReceiveMode.PeekLock);
var message = msgClient.Receive();
if (message != null)
{
   // we got here means,there was one active message found and now message is locked from others view
   if (message.MessageId == "xxx")
   {
      //with id 'xxx',defer
      message.Defer();
   }
   // regardless of condition match,abandoning   
   message.Abandon();
}
else
{
   // got here means there was no active Message above
   // now see if we have deferred Message
   var peeked = msgClient.Peek();
   // The reason of checking state here again,in case there was new message arrived in the queue by now
   if (peeked != null && peeked.MessageId == "xxx" && peeked.State == MessageState.Deferred)
   {
     try
     {
       // now receive it to lock 
       // yes,you can receive 'deferred' message by sequence number,but not 'active' message :)
       message = msgClient.Receive(peeked.SequenceNumber);

       // Perform some operation with deferred state message

       message.Complete(); // don't forget it
     }
     catch (MessageNotFoundException)
     {
       // just in case message was peeked by competing consumer and received
     }
   }
}

注意:在非分区实体上的peek操作始终返回最旧的消息,但不在分区实体上返回。而是在消息代理首先响应的分区中返回最旧的消息。无法保证返回的消息是所有分区中最早的消息。