SQL Server Service Broker-改进SQL执行框架的方法 服务代理设计处理查询

问题描述

下面是我一直在使用的Service broker进行sql执行框架设计的概述。我已经概述了过程,并提出了一些问题(使用引号突出显示),并且有兴趣听取有关设计的任何建议。

概述

我有一个ETL操作,需要从5个数据库中取出数据,然后使用select/insert语句或stored procedures将其移至150个数据库中。结果是大约2,000个独立查询,每个查询耗时1到1个小时。

每个SQL查询仅插入数据。无需返回数据。

该操作可以分为3个步骤:

  • ETL之前的
  • ETL
  • ETL后

每个步骤中的查询都可以按任意顺序执行,但是步骤必须保持顺序。

方法

我正在使用Service broker进行异步/并行执行。

关于如何调整服务代理的任何建议(例如,要查看的任何特定选项或用于设置队列工作者数量的指南?

服务代理设计

启动器

initiator将包含SQL查询XML消息与称为Unprocessed的激活存储过程一起发送到ProcessUnprocessedQueue队列。此过程包装在事务中的try/catch中,在出现异常时回滚事务。

ProcessUnpressedQueue

ProcessUnprocessedQueue将XML传递到过程Execsql

Execsql-sql执行和记录

Execsql然后处理sql执行和日志记录:

  • XML和将要记录的有关执行的任何其他数据一起被解析
  • 在执行之前,将插入一个日志记录条目
    • 如果事务在initiator中启动,如果回退initiator中的外部事务,是否可以确保始终提交日志条目插入?

    • SAVE TRANSACTION之类的东西在这里无效,对吗?

    • 我应该不要在这里操作事务,在try/catch中执行查询,如果要进行捕获,则为异常插入日志条目,然后throw例外,因为它在交易中间?

  • 查询已执行
替代日志记录解决方案?

我需要登录

  • 已执行的SQL查询
  • 有关操作的元数据
  • 每个过程完成所需的时间
    • 这就是为什么我在流程的开头插入一行,并在流程结束时插入一行
    • 的原因
  • 任何异常(如果存在)

最好有一个包含查询信息的In-Memory OLTP表?因此,我将在操作开始前一行INSERT,然后执行UPDATEINSERT来记录异常和执行时间。批处理完成后,我将数据存档到存储在磁盘中的表中,以防止表变得太大。

ProcessUnprocessedQueue-手动处理结果

执行后,ProcessUnprocessedQueue返回XML的更新版本(以确定执行是否成功,或有关事务的其他数据以进行后处理),然后发送该消息到ProcessedQueue没有没有激活过程,因此可以手动进行处理(我需要知道一批查询何时完成执行)。

处理查询

由于ETL可以分为3个步骤,因此我创建了3个XML变量,在其中添加了ETL操作所需的所有查询,因此我将得到以下内容

  • @preEtlQueue xml
  • @etlQueue xml
  • @postEtlQueue xml

为什么使用XML?

XML队列变量作为OUTPUT参数在不同的存储过程之间传递,该参数更新其值和/或向其添加SQL查询。该变量需要写入和读取,因此替代方案可以是诸如全局临时表或持久性表之类的东西。

然后我处理XML变量:

  • 使用cursor循环查询,并将其发送到服务代理服务。
    • XML变量中包含的每组查询都在同一conversation_group_id下发送。
    • 诸如“ to / from服务”,“消息类型”之类的值都存储在XML变量中。
  • 将消息发送到Service broker之后,使用while循环连续检查ProcessedQueue,直到处理完所有消息为止。
    • 这实现了超时以避免无限循环
    • 我正在考虑重新设计它。我应该在ProcessedQueue添加一个激活过程,然后使该过程将处理后的结果插入物理表中吗?如果这样做,将无法使用RECEIVE而不是WHILE循环来检查已处理的项目。那有什么缺点吗?

解决方法

我还没有建立起您现在正在做的任何事情,但是我会为您提供对我有用的东西,以及一些一般性意见...

  • 我的首选是避免使用内存中的OLTP,并将所有内容写入持久表,并保持消息队列尽可能干净

  • 使用服务器中最快的硬盘驱动器,写入速度相当于NVMe或使用RAID 10等更快。

  • 我将每条命中的消息立即从队列中抓取并将其写入我名为“ mqMessagesReceived”的表中(请参见下面的代码,我的通用MQ处理程序名为mqAsyncQueueMessageOnCreate)

  • 我在“ mqMessagesReceived”表中使用一个触发器,该触发器进行查找以查找执行哪个StoredProcedure来处理每个唯一消息(请参见下面的代码)

  • 每条消息都有一个标识符(在我的情况下,我正在使用向消息写入消息的原始Tablename),并且该标识符用作在触发器的触发器内运行的查询的关键字mqMessagesReceived表,以确定需要运行哪个后续存储过程,以正确处理每个收到的消息。

  • 在MQ上发送消息之前,

可以在调用方创建通用变量(例如,如果触发器将消息放入MQ上)

SELECT @tThisTableName = OBJECT_NAME(parent_object_id) FROM sys.objects 
WHERE sys.objects.name = OBJECT_NAME(@@PROCID)
AND SCHEMA_NAME(sys.objects.schema_id) = OBJECT_SCHEMA_NAME(@@PROCID);
  • 配置表是用于将表名与需要运行的StoredProcedure匹配的查找数据,以处理到达并写入mqMessagesReceived表的MQ数据。

这是该查找表的定义

CREATE TABLE [dbo].[mqMessagesConfig](
    [ID] [int] IDENTITY(1,1) NOT NULL,[tSourceTableReceived] [nvarchar](128) NOT NULL,[tTriggeredStoredProcedure] [nvarchar](128) NOT NULL,CONSTRAINT [PK_mqMessagesConfig] PRIMARY KEY CLUSTERED 
(
    [ID] ASC
)WITH (PAD_INDEX = OFF,STATISTICS_NORECOMPUTE = OFF,IGNORE_DUP_KEY = OFF,ALLOW_ROW_LOCKS = ON,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO

这是激活存储过程,它在消息进入队列时运行

CREATE PROCEDURE [dbo].[mqAsyncQueueMessageOnCreate]
AS
BEGIN
    SET NOCOUNT ON
 
    DECLARE 
        @h UNIQUEIDENTIFIER,@t sysname,@b varbinary(200),@hand VARCHAR(36),@body VARCHAR(2000),@sqlcleanup nvarchar(MAX)

        
 
    -- Get all of the messages on the queue
    -- the WHILE loop is infinite,until BREAK is received when we get a null handle
    WHILE 1=1
    BEGIN
        SET @h = NULL
 
        --Note the semicolon..!
        ;RECEIVE TOP(1) 
            @h = conversation_handle,@t = message_type_name,@b = message_body
        FROM mqAsyncQueue


        --No message found (handle is now null)
        IF @h IS NULL
        BEGIN
            -- all messages are now processed,but we still have the @hand variable saved from processing the last message
            SET @sqlcleanup = 'EXEC [mqConversationsClearOne]  @handle = N' + char(39) + @hand + char(39) + ';';
            EXECUTE(@sqlcleanup);
            BREAK
        END


        --mqAsyncMessage message type received
        ELSE IF @t = 'mqAsyncMessage'
        BEGIN
            SET @hand = CONVERT(varchar(36),@h);
            SET @body = CONVERT(varchar(2000),@b);
            
            INSERT mqMessagesReceived  (tMessageType,tMessageBody,tMessageBinary,tConversationHandle)
            VALUES (@t,@body,@b,@hand);
        
        END
        
        
        --unknown message type was received that we dont understand
        ELSE
        BEGIN
            INSERT mqMessagesReceived (tMessageBody,tMessageBinary)  
            VALUES ('Unknown message type received',CONVERT(varbinary(MAX),'Unknown message type received'))
        END

    END        

END






CREATE PROCEDURE [dbo].[mqConversationsClearOne]

    @handle varchar(36)

AS

    -- Note: you can check the queue by running this query
    -- SELECT * FROM sys.conversation_endpoints 
    -- SELECT * FROM sys.conversation_endpoints WHERE NOT([State] = 'CO') 
    -- CO = conversing [State]


    DECLARE @getid CURSOR,@sql NVARCHAR(MAX),@conv_id NVARCHAR(100),@conv_handle NVARCHAR(100)


    -- want to create a chain of statements like this,one per conversation
    -- END CONVERSATION 'FE851F37-218C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
    -- END CONVERSATION 'A4B4F603-208C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;    

    SET @getid = CURSOR FOR
                    SELECT [conversation_id],[conversation_handle]  
                    FROM   sys.conversation_endpoints
                    WHERE conversation_handle = @handle;


    OPEN @getid

        FETCH NEXT
        FROM @getid INTO @conv_id,@conv_handle 
        WHILE @@FETCH_STATUS = 0
            BEGIN
                SET @sql = 'END CONVERSATION ' + char(39) + @conv_handle + char(39) + ' WITH CLEANUP;'
                EXEC sys.sp_executesql @stmt = @sql;
                FETCH NEXT
                FROM @getid INTO @conv_id,@conv_handle  --,@conv_service
            END

    CLOSE @getid
    DEALLOCATE @getid

名为“ mqMessagesReceived”的表具有此触发器

CREATE TRIGGER [dbo].[mqMessagesReceived_TriggerUpdate]

ON [dbo].[mqMessagesReceived]

AFTER INSERT

AS
  BEGIN


    DECLARE     

    @strMessageBody nvarchar(4000),@strSourceTable nvarchar(128),@strSourceKey nvarchar(128),@strConfigStoredProcedure nvarchar(4000),@sqlRunStoredProcedure nvarchar(4000),@strErr nvarchar(4000)


    SELECT @strMessageBody= ins.tMessageBody FROM INSERTED ins; 

    SELECT @strSourceTable = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=2);

    SELECT @strSourceKey =  (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=3);

    
    -- look in mqMessagesConfig to find the name of the final stored procedure 
    -- to run against the SourceTable
    -- e.g.  @strConfigStoredProcedure = mqProcess-tblLabDaySchedEventsMQ

    SELECT @strConfigStoredProcedure =
    (select tTriggeredStoredProcedure from dbo.mqMessagesConfig WHERE tSourceTableReceived = @strSourceTable);

    

    SET @sqlRunStoredProcedure = 'EXEC [' + @strConfigStoredProcedure + ']  @iKey = ' + @strSourceKey + ';';
    EXECUTE(@sqlRunStoredProcedure);


    INSERT INTO [mqMessagesProcessed]
    ( 
        [tMessageBody],[tSourceTable],[tSourceKey],[tTriggerStoredProcedure]
    )

    VALUES  
    (
        @strMessageBody,@strSourceTable,@strSourceKey,@sqlRunStoredProcedure
    );


  END

此外,我发现我还必须做一些常规的SQL Server调整建议(用于处理繁忙的数据库)

默认情况下,每个SQL Server只有一个单个TempDB文件,并且TempDB的初始大小为8MB

但是,每次服务器重新启动时,TempDB都重置为初始8MB大小,并且该公司每个周末都通过cron / taskscheduler重新启动服务器。

我们看到的问题是数据库运行缓慢且记录锁定很多,但这只是星期一早上的第一件事,当时每个人在开始工作一周时都会立即对数据库进行锤击。

当TempDB自动调整大小时,它被“锁定”,因此根本没有人可以使用该单个TempDB(这就是SQL Server经常变得不响应的原因)

到星期五,TempDB已增长到300MB以上。

因此...为了解决以下最佳实践建议,我为每个vCPU创建了一个TempDB文件,因此我创建了8个TempDB文件,并将它们分布在该服务器上的两个可用硬盘驱动器上,最重要的是,设置它们初始大小超过我们的需要(我选择了200MB)。

这解决了每个星期一早上遇到的SQL Server速度下降和记录锁定问题。