Azure Synapse 按顺序处理 20k 条语句的最快方法

问题描述

我正在为基于云的数据库 (Azure) 设计增量更新过程。唯一现有的更改日志是一个 .txt 文件,它记录了数据库处理的每个插入、删除和更新语句。没有可用的更改数据捕获表,或任何记录更改的数据库表,我无法在数据库上启用水印。 .txt 文件的结构如下:

update [table] set x = 'data' where y = 'data'
go
insert into [table] values (data)
go
delete from [table] where x = data
go

我已经构建了将 .txt 文件转换为云端表格的流程,如下所示:

update_id | db_operation | statement                                       | user  | processed_flag
----------|--------------|-------------------------------------------------|-------|---------------
1         | 'update'     | 'update [table] set x = data where y = data'    | user1 | 0
2         | 'insert'     | 'insert into [table] values (data)'             | user2 | 0
3         | 'delete'     | 'delete from [table] where x = data'            | user3 | 1

我使用这段代码为未处理的事务创建一个临时表,然后遍历该表,创建一个 sql 语句,然后执行该事务。

CREATE TABLE temp_incremental_updates
WITH
( 
    disTRIBUTION = HASH ( [user] ),HEAP
)
AS
SELECT  ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,[user],[statement]
FROM    upd.incremental_updates
WHERE   processed_flag = 0;

DECLARE @nbr_statements INT = (SELECT COUNT(*) FROM temp_incremental_updates),@i INT = 1;

WHILE   @i <= @nbr_statements
BEGIN
    DECLARE @sql_code NVARCHAR(4000) = (SELECT [statement] FROM temp_incremental_updates WHERE Sequence = @i);
    EXEC    sp_executesql @sql_code;
    SET     @i +=1;
END

DROP TABLE temp_incremental_updates;

UPDATE incremental_updates SET processed_flag = 1

这需要很长时间,超过一个小时。有没有不同的方法可以快速处理需要以特定顺序发生的多个 sql 语句?顺序是相关的,因为例如:如果我尝试在创建该数据的插入语句之前处理删除语句,azure synapse 将抛出错误

解决方法

不到 2 小时的 20k 条语句对 Synapse 来说已经相当不错了!

Synapse 不打算进行事务处理。您需要将单个更新转换为批量更新,并对大批量或行执行 statements like MERGE,而不是对每行执行 INSERTUPDATEDELETE

在您的情况下,您可以:

  • 按表名对所有插入/更新进行分组
  • 为每个组创建一个临时表。例如。 table1_insert_updates
  • MERGEtable1_insert_updates 运行 table1 之类的语句。

对于删除:

  • 按表名对主键进行分组
  • 每桌运行一个DELETE FROM table1 where key in (primary keys)

坦率地说,20k 是一个很糟糕的数字,它不是太小,也远远不够大。因此,即使在“分组”之后,如果批处理/组大小太小,您仍然可能会遇到性能问题。

Synapse 不用于事务处理。它将使用单个 MERGE 语句在不到 5 分钟的时间内将具有 100 万行的表合并到具有 10 亿行的表中,以向上插入 100 万行,但是如果您一个接一个地运行 1000 个删除和 1000 个插入语句,它将可能需要更长的时间!


编辑:您还必须使用 PARTITION BYRANK(或 ROWNUMBER)进行重复数据删除,以防在单个批次中对同一行进行多次更新。这并不容易,具体取决于您的输入方式(更新包含所有列(甚至未更改)或仅更改的列),这可能会变得非常复杂。

再次Synapse 不用于事务处理。

,

尝试声明一个游标来一次性从 temp_incremental_updates 中选择所有数据,而不是进行多次读取:

CREATE TABLE temp_incremental_updates
WITH
( 
    DISTRIBUTION = HASH ( [user] ),HEAP
)
AS
SELECT  ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,[user],[statement]
FROM    upd.incremental_updates
WHERE   processed_flag = 0;

DECLARE cur CURSOR FOR SELECT [statement] FROM temp_incremental_updates ORDER BY Sequence

OPEN cur
FETCH NEXT FROM cur INTO @sql_code
WHILE @@FETCH_STATUS = 0 BEGIN
  EXEC sp_executesql @sql_code;
  FETCH NEXT FROM cur INTO @sql_code
END

-- Rest of the code