问题描述
下面是我一直在使用的Service broker
进行sql执行框架设计的概述。我已经概述了过程,并提出了一些问题(使用引号突出显示),并且有兴趣听取有关设计的任何建议。
概述
我有一个ETL操作,需要从5个数据库中取出数据,然后使用select/insert
语句或stored procedures
将其移至150个数据库中。结果是大约2,000个独立查询,每个查询耗时1到1个小时。
每个SQL查询仅插入数据。无需返回数据。
该操作可以分为3个步骤:
- ETL之前的
- ETL
- ETL后
每个步骤中的查询都可以按任意顺序执行,但是步骤必须保持顺序。
方法
我正在使用Service broker
进行异步/并行执行。
关于如何调整服务代理的任何建议(例如,要查看的任何特定选项或用于设置队列工作者数量的指南?
服务代理设计
启动器 initiator
将包含SQL查询的XML
消息与称为Unprocessed
的激活存储过程一起发送到ProcessUnprocessedQueue
队列。此过程包装在事务中的try/catch
中,在出现异常时回滚事务。
ProcessUnprocessedQueue
将XML传递到过程Execsql
我需要登录:
- 已执行的SQL查询
- 有关操作的元数据
- 每个过程完成所需的时间
- 这就是为什么我在流程的开头插入一行,并在流程结束时插入一行 的原因
- 任何异常(如果存在)
最好有一个包含查询信息的In-Memory OLTP
表?因此,我将在操作开始前一行INSERT
,然后执行UPDATE
或INSERT
来记录异常和执行时间。批处理完成后,我将数据存档到存储在磁盘中的表中,以防止表变得太大。
执行后,ProcessUnprocessedQueue
返回XML
的更新版本(以确定执行是否成功,或有关事务的其他数据以进行后处理),然后发送该消息到ProcessedQueue
,没有没有激活过程,因此可以手动进行处理(我需要知道一批查询何时完成执行)。
处理查询
由于ETL可以分为3个步骤,因此我创建了3个XML
变量,在其中添加了ETL操作所需的所有查询,因此我将得到以下内容:
为什么使用XML?
XML队列变量作为
OUTPUT
参数在不同的存储过程之间传递,该参数更新其值和/或向其添加SQL查询。该变量需要写入和读取,因此替代方案可以是诸如全局临时表或持久性表之类的东西。
然后我处理XML
变量:
- 使用
cursor
循环查询,并将其发送到服务代理服务。-
XML
变量中包含的每组查询都在同一conversation_group_id
下发送。 - 诸如“ to / from服务”,“消息类型”之类的值都存储在
XML
变量中。
-
- 将消息发送到Service broker之后,使用while循环连续检查
ProcessedQueue
,直到处理完所有消息为止。
解决方法
我还没有建立起您现在正在做的任何事情,但是我会为您提供对我有用的东西,以及一些一般性意见...
-
我的首选是避免使用内存中的OLTP,并将所有内容写入持久表,并保持消息队列尽可能干净
-
使用服务器中最快的硬盘驱动器,写入速度相当于NVMe或使用RAID 10等更快。
-
我将每条命中的消息立即从队列中抓取并将其写入我名为“ mqMessagesReceived”的表中(请参见下面的代码,我的通用MQ处理程序名为mqAsyncQueueMessageOnCreate)
-
我在“ mqMessagesReceived”表中使用一个触发器,该触发器进行查找以查找执行哪个StoredProcedure来处理每个唯一消息(请参见下面的代码)
-
每条消息都有一个标识符(在我的情况下,我正在使用向消息写入消息的原始Tablename),并且该标识符用作在触发器的触发器内运行的查询的关键字mqMessagesReceived表,以确定需要运行哪个后续存储过程,以正确处理每个收到的消息。
-
在MQ上发送消息之前,
可以在调用方创建通用变量(例如,如果触发器将消息放入MQ上)
SELECT @tThisTableName = OBJECT_NAME(parent_object_id) FROM sys.objects
WHERE sys.objects.name = OBJECT_NAME(@@PROCID)
AND SCHEMA_NAME(sys.objects.schema_id) = OBJECT_SCHEMA_NAME(@@PROCID);
- 配置表是用于将表名与需要运行的StoredProcedure匹配的查找数据,以处理到达并写入mqMessagesReceived表的MQ数据。
这是该查找表的定义
CREATE TABLE [dbo].[mqMessagesConfig](
[ID] [int] IDENTITY(1,1) NOT NULL,[tSourceTableReceived] [nvarchar](128) NOT NULL,[tTriggeredStoredProcedure] [nvarchar](128) NOT NULL,CONSTRAINT [PK_mqMessagesConfig] PRIMARY KEY CLUSTERED
(
[ID] ASC
)WITH (PAD_INDEX = OFF,STATISTICS_NORECOMPUTE = OFF,IGNORE_DUP_KEY = OFF,ALLOW_ROW_LOCKS = ON,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]
GO
这是激活存储过程,它在消息进入队列时运行
CREATE PROCEDURE [dbo].[mqAsyncQueueMessageOnCreate]
AS
BEGIN
SET NOCOUNT ON
DECLARE
@h UNIQUEIDENTIFIER,@t sysname,@b varbinary(200),@hand VARCHAR(36),@body VARCHAR(2000),@sqlcleanup nvarchar(MAX)
-- Get all of the messages on the queue
-- the WHILE loop is infinite,until BREAK is received when we get a null handle
WHILE 1=1
BEGIN
SET @h = NULL
--Note the semicolon..!
;RECEIVE TOP(1)
@h = conversation_handle,@t = message_type_name,@b = message_body
FROM mqAsyncQueue
--No message found (handle is now null)
IF @h IS NULL
BEGIN
-- all messages are now processed,but we still have the @hand variable saved from processing the last message
SET @sqlcleanup = 'EXEC [mqConversationsClearOne] @handle = N' + char(39) + @hand + char(39) + ';';
EXECUTE(@sqlcleanup);
BREAK
END
--mqAsyncMessage message type received
ELSE IF @t = 'mqAsyncMessage'
BEGIN
SET @hand = CONVERT(varchar(36),@h);
SET @body = CONVERT(varchar(2000),@b);
INSERT mqMessagesReceived (tMessageType,tMessageBody,tMessageBinary,tConversationHandle)
VALUES (@t,@body,@b,@hand);
END
--unknown message type was received that we dont understand
ELSE
BEGIN
INSERT mqMessagesReceived (tMessageBody,tMessageBinary)
VALUES ('Unknown message type received',CONVERT(varbinary(MAX),'Unknown message type received'))
END
END
END
CREATE PROCEDURE [dbo].[mqConversationsClearOne]
@handle varchar(36)
AS
-- Note: you can check the queue by running this query
-- SELECT * FROM sys.conversation_endpoints
-- SELECT * FROM sys.conversation_endpoints WHERE NOT([State] = 'CO')
-- CO = conversing [State]
DECLARE @getid CURSOR,@sql NVARCHAR(MAX),@conv_id NVARCHAR(100),@conv_handle NVARCHAR(100)
-- want to create a chain of statements like this,one per conversation
-- END CONVERSATION 'FE851F37-218C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
-- END CONVERSATION 'A4B4F603-208C-EA11-B698-4CCC6AD00AE9' WITH CLEANUP;
SET @getid = CURSOR FOR
SELECT [conversation_id],[conversation_handle]
FROM sys.conversation_endpoints
WHERE conversation_handle = @handle;
OPEN @getid
FETCH NEXT
FROM @getid INTO @conv_id,@conv_handle
WHILE @@FETCH_STATUS = 0
BEGIN
SET @sql = 'END CONVERSATION ' + char(39) + @conv_handle + char(39) + ' WITH CLEANUP;'
EXEC sys.sp_executesql @stmt = @sql;
FETCH NEXT
FROM @getid INTO @conv_id,@conv_handle --,@conv_service
END
CLOSE @getid
DEALLOCATE @getid
名为“ mqMessagesReceived”的表具有此触发器
CREATE TRIGGER [dbo].[mqMessagesReceived_TriggerUpdate]
ON [dbo].[mqMessagesReceived]
AFTER INSERT
AS
BEGIN
DECLARE
@strMessageBody nvarchar(4000),@strSourceTable nvarchar(128),@strSourceKey nvarchar(128),@strConfigStoredProcedure nvarchar(4000),@sqlRunStoredProcedure nvarchar(4000),@strErr nvarchar(4000)
SELECT @strMessageBody= ins.tMessageBody FROM INSERTED ins;
SELECT @strSourceTable = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=2);
SELECT @strSourceKey = (select txt_Value from dbo.fn_ParseText2Table(@strMessageBody,'|') WHERE Position=3);
-- look in mqMessagesConfig to find the name of the final stored procedure
-- to run against the SourceTable
-- e.g. @strConfigStoredProcedure = mqProcess-tblLabDaySchedEventsMQ
SELECT @strConfigStoredProcedure =
(select tTriggeredStoredProcedure from dbo.mqMessagesConfig WHERE tSourceTableReceived = @strSourceTable);
SET @sqlRunStoredProcedure = 'EXEC [' + @strConfigStoredProcedure + '] @iKey = ' + @strSourceKey + ';';
EXECUTE(@sqlRunStoredProcedure);
INSERT INTO [mqMessagesProcessed]
(
[tMessageBody],[tSourceTable],[tSourceKey],[tTriggerStoredProcedure]
)
VALUES
(
@strMessageBody,@strSourceTable,@strSourceKey,@sqlRunStoredProcedure
);
END
此外,我发现我还必须做一些常规的SQL Server调整建议(用于处理繁忙的数据库)
默认情况下,每个SQL Server只有一个单个TempDB文件,并且TempDB的初始大小为8MB
但是,每次服务器重新启动时,TempDB都重置为初始8MB大小,并且该公司每个周末都通过cron / taskscheduler重新启动服务器。
我们看到的问题是数据库运行缓慢且记录锁定很多,但这只是星期一早上的第一件事,当时每个人在开始工作一周时都会立即对数据库进行锤击。
当TempDB自动调整大小时,它被“锁定”,因此根本没有人可以使用该单个TempDB(这就是SQL Server经常变得不响应的原因)
到星期五,TempDB已增长到300MB以上。
因此...为了解决以下最佳实践建议,我为每个vCPU创建了一个TempDB文件,因此我创建了8个TempDB文件,并将它们分布在该服务器上的两个可用硬盘驱动器上,最重要的是,设置它们初始大小超过我们的需要(我选择了200MB)。
这解决了每个星期一早上遇到的SQL Server速度下降和记录锁定问题。