关于实现服务总线会话块的建议

问题描述

当前,我们有一个具有后台服务的dotnet核心应用,该服务从启用了会话的服务总线接收消息,其中sessionIduserId,并且消息包含用户信息的更新。现在,我们想实现一个功能,通过阻止特定的userId/sessionId来临时暂停对特定用户的更新,但是在取消阻止时仍按顺序处理消息。解决这个问题的最佳方法是什么?

我试图浏览服务总线文档和示例。主要是message deferralmessage session statesession state sample

我发现了有关SessionState和消息延迟的一些信息,我想知道它们是否可用于实现此功能并仍然保证处理顺序(无论是否延迟消息,都是FIFO)。我正在考虑尝试将序列号存储在会话状态中,并继续通过该序列号接收延迟的消息,并递增该序列号以接收下一条消息,直到我用完消息为止。

当前,我们的代码如下所示:

            this.queue.RegisterSessionHandler(
                this.SessionHandler,new SessionHandlerOptions(this.ExceptionHandler)
                {
                    AutoComplete = false,MessageWaitTimeout = TimeSpan.FromMinutes(1),});

其中this.SessionHandler是处理消息然后通过调用session.CompleteAsyncsession.CloseAsync来完成和关闭会话的函数。但是,我在构思如何将延迟逻辑添加到我们的代码时遇到了麻烦。因为当前,RegisterSessionHandler已经处理了会话锁,并使用sessionId对消息进行负载均衡(我认为),这很好。但是RegisterSessionHandler也不允许您指定要处理的特定sessionId

我有几条关于userId/sessionId: A的消息。当我想取消对此用户的处理时,我不能简单地将延迟的消息插入回队列。由于发件人仍会不断向用户A发送消息到队列,这会弄乱订单。

我上面提到的会话状态示例一个很好的示例,说明了如何使用会话状态和处理延迟的消息。但是,它仅使用一个sessionId,而不使用RegisterSessionHandler。我的问题是:如果要实现延迟消息处理逻辑(保留顺序),是否必须实现自己的RegisterSessionHandler并处理sessionId负载均衡?

谢谢!

解决方法

您应该在QueueClient中使用SessionClient而不是RegisterSessionHandler来更好地处理延迟方案并保留顺序。您可以在邮件正文中维护一些步骤/序列号。当您实际处理消息时,还要添加LastProcessedStep / Seqence。 Session state允许跟踪处理程序与会话相关的处理状态,从而使客户端可以在会话处理期间在处理节点之间灵活切换(包括故障转移)。 sample通过维护该延迟消息来处理(步骤)。它结合了Deferral和Session功能,以便使用会话状态工具来跟踪工作流的处理状态,在该状态下,各个步骤的输入都超出了预期的顺序。请注意,发送方代码还演示了通过以不可预测的顺序发送消息,但是凭借会话状态,接收方可以检测到顺序。

//   
//   Copyright © Microsoft Corporation,All Rights Reserved
// 
//   Licensed under the Apache License,Version 2.0 (the "License"); 
//   you may not use this file except in compliance with the License. 
//   You may obtain a copy of the License at
// 
//   http://www.apache.org/licenses/LICENSE-2.0 
// 
//   THIS CODE IS PROVIDED *AS IS* BASIS,WITHOUT WARRANTIES OR CONDITIONS
//   OF ANY KIND,EITHER EXPRESS OR IMPLIED,INCLUDING WITHOUT LIMITATION
//   ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE,FITNESS FOR A
//   PARTICULAR PURPOSE,MERCHANTABILITY OR NON-INFRINGEMENT.
// 
//   See the Apache License,Version 2.0 for the specific language
//   governing permissions and limitations under the License. 

namespace SessionState
{
    using Microsoft.Azure.ServiceBus;
    using Microsoft.Azure.ServiceBus.Core;
    using Newtonsoft.Json;
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;

    public class Program : MessagingSamples.Sample
    {
        public async Task Run(string connectionString)
        {
            Console.WriteLine("Press any key to exit the scenario");

            var sendTask = this.SendMessagesAsync(Guid.NewGuid().ToString(),connectionString,SessionQueueName);
            var sendTask2 = this.SendMessagesAsync(Guid.NewGuid().ToString(),SessionQueueName);
            var receiveTask = this.ReceiveMessagesAsync(connectionString,SessionQueueName);

            await Task.WhenAll(sendTask,sendTask2,receiveTask);
        }

        async Task SendMessagesAsync(string session,string connectionString,string queueName)
        {
            var sender = new MessageSender(connectionString,queueName);


            Console.WriteLine("Sending messages to Queue...");

            ProcessingState[] data = new[]
            {
                new ProcessingState {Step = 1,Title = "Buy"},new ProcessingState {Step = 2,Title = "Unpack"},new ProcessingState {Step = 3,Title = "Prepare"},new ProcessingState {Step = 4,Title = "Cook"},new ProcessingState {Step = 5,Title = "Eat"},};

            var rnd = new Random();
            var tasks = new List<Task>();
            for (int i = 0; i < data.Length; i++)
            {
                var message = new Message(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(data[i])))
                {
                    SessionId = session,ContentType = "application/json",Label = "RecipeStep",MessageId = i.ToString(),TimeToLive = TimeSpan.FromMinutes(2)
                };

                tasks.Add(Task.Delay(rnd.Next(30)).ContinueWith(
                      async (t) =>
                      {
                          await sender.SendAsync(message);
                          lock (Console.Out)
                          {
                              Console.ForegroundColor = ConsoleColor.Yellow;
                              Console.WriteLine("Message sent: Id = {0}",message.MessageId);
                              Console.ResetColor();
                          }
                      }));
            }
            await Task.WhenAll(tasks);
        }

        async Task ReceiveMessagesAsync(string connectionString,string queueName)
        {
            var client = new SessionClient(connectionString,queueName,ReceiveMode.PeekLock);

            while (true)
            {
                var session = await client.AcceptMessageSessionAsync();
                await Task.Run(
                    async () =>
                    {
                        ProcessingState processingState;

                        var stateData = await session.GetStateAsync();
                        if (stateData != null)
                        {
                            processingState = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(stateData));
                        }
                        else
                        {
                            processingState = new ProcessingState
                            {
                                LastProcessedRecipeStep = 0,DeferredSteps = new Dictionary<int,long>()
                            };
                        }

                        while (true)
                        {
                            try
                            {
                                //receive messages from Queue
                                var message = await session.ReceiveAsync(TimeSpan.FromSeconds(5));
                                if (message != null)
                                {
                                    if (message.Label != null &&
                                        message.ContentType != null &&
                                        message.Label.Equals("RecipeStep",StringComparison.InvariantCultureIgnoreCase) &&
                                        message.ContentType.Equals("application/json",StringComparison.InvariantCultureIgnoreCase))
                                    {
                                        var body = message.Body;

                                        ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                        if (recipeStep.Step == processingState.LastProcessedRecipeStep + 1)
                                        {
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0},\n\t\t\t\t\t\tSequenceNumber = {1},\n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
                                                    "\n\t\t\t\t\t\tExpiresAtUtc = {5},\n\t\t\t\t\t\tContentType = \"{3}\",\n\t\t\t\t\t\tSize = {4},\n\t\t\t\t\t\tContent: [ step = {6},title = {7} ]",message.MessageId,message.SystemProperties.SequenceNumber,message.SystemProperties.EnqueuedTimeUtc,message.ContentType,message.Size,message.ExpiresAtUtc,recipeStep.Step,recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(message.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = recipeStep.Step;
                                            await
                                                session.SetStateAsync(
                                                    Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                        else
                                        {
// in your case,if customer update is blocked,you can defer
                                            processingState.DeferredSteps.Add((int)recipeStep.Step,(long)message.SystemProperties.SequenceNumber);
                                            await session.DeferAsync(message.SystemProperties.LockToken);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    else
                                    {
                                        await session.DeadLetterAsync(message.SystemProperties.LockToken);//,"ProcessingError","Don't know what to do with this message");
                                    }
                                }
                                else
                                {
                                    while (processingState.DeferredSteps.Count > 0)
                                    {
                                        long step;

                                        if (processingState.DeferredSteps.TryGetValue(processingState.LastProcessedRecipeStep + 1,out step))
                                        {
                                            var deferredMessage = await session.ReceiveDeferredMessageAsync(step);
                                            var body = deferredMessage.Body;
                                            ProcessingState recipeStep = JsonConvert.DeserializeObject<ProcessingState>(Encoding.UTF8.GetString(body));
                                            lock (Console.Out)
                                            {
                                                Console.ForegroundColor = ConsoleColor.Cyan;
                                                Console.WriteLine(
                                                    "\t\t\t\tdeferredMessage received: \n\t\t\t\t\t\tMessageId = {0},deferredMessage.MessageId,deferredMessage.SystemProperties.SequenceNumber,deferredMessage.SystemProperties.EnqueuedTimeUtc,deferredMessage.ContentType,deferredMessage.Size,deferredMessage.ExpiresAtUtc,recipeStep.Title);
                                                Console.ResetColor();
                                            }
                                            await session.CompleteAsync(deferredMessage.SystemProperties.LockToken);
                                            processingState.LastProcessedRecipeStep = processingState.LastProcessedRecipeStep + 1;
                                            processingState.DeferredSteps.Remove(processingState.LastProcessedRecipeStep);
                                            await session.SetStateAsync(Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(processingState)));
                                        }
                                    }
                                    break;
                                }
                            }
                            catch (ServiceBusException e)
                            {
                                if (!e.IsTransient)
                                {
                                    Console.WriteLine(e.Message);
                                    throw;
                                }
                            }
                        }
                        await session.CloseAsync();
                    });
            }
        }

       public static int Main(string[] args)
        {
            try
            {
                var app = new Program();
                app.RunSample(args,app.Run);
            }
            catch (Exception e)
            {
                Console.WriteLine(e.ToString());
                return 1;
            }
            return 0;
        }

        class ProcessingState
        {
            [JsonProperty]
            public int LastProcessedRecipeStep { get; set; }
            [JsonProperty]
            public Dictionary<int,long> DeferredSteps { get; set; }
            [JsonProperty]
            public int Step { get; internal set; }
            [JsonProperty]
            public string Title { get; internal set; }
        }
    }
}

您也可以遵循Ordering Messages in Azure Service Bus,它对概念进行了很好的解释。但是那里提供的示例与上面的稍有不同。

警告:使用消息会话也意味着一个会话(对于您的情况是用户ID),该会话中的消息将始终由单个接收者接收和处理。因此,在设置Session和SessionId时要谨记。如果创建一个非常大的会话,这将迫使Azure Service Bus将大多数消息发送到一个订阅服务器,从而降低了多线程的好处。如果将会话设置得过于精细,那么它将失去其预期的好处,而您只是在增加不必要的开销。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...