问题描述
我正在为基于云的数据库 (Azure) 设计增量更新过程。唯一现有的更改日志是一个 .txt 文件,它记录了数据库处理的每个插入、删除和更新语句。没有可用的更改数据捕获表,或任何记录更改的数据库表,我无法在数据库上启用水印。 .txt 文件的结构如下:
update [table] set x = 'data' where y = 'data'
go
insert into [table] values (data)
go
delete from [table] where x = data
go
我已经构建了将 .txt 文件转换为云端表格的流程,如下所示:
update_id | db_operation | statement | user | processed_flag
----------|--------------|-------------------------------------------------|-------|---------------
1 | 'update' | 'update [table] set x = data where y = data' | user1 | 0
2 | 'insert' | 'insert into [table] values (data)' | user2 | 0
3 | 'delete' | 'delete from [table] where x = data' | user3 | 1
我使用这段代码为未处理的事务创建一个临时表,然后遍历该表,创建一个 sql 语句,然后执行该事务。
CREATE TABLE temp_incremental_updates
WITH
(
disTRIBUTION = HASH ( [user] ),HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,[user],[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE @nbr_statements INT = (SELECT COUNT(*) FROM temp_incremental_updates),@i INT = 1;
WHILE @i <= @nbr_statements
BEGIN
DECLARE @sql_code NVARCHAR(4000) = (SELECT [statement] FROM temp_incremental_updates WHERE Sequence = @i);
EXEC sp_executesql @sql_code;
SET @i +=1;
END
DROP TABLE temp_incremental_updates;
UPDATE incremental_updates SET processed_flag = 1
这需要很长时间,超过一个小时。有没有不同的方法可以快速处理需要以特定顺序发生的多个 sql 语句?顺序是相关的,因为例如:如果我尝试在创建该数据的插入语句之前处理删除语句,azure synapse 将抛出错误。
解决方法
不到 2 小时的 20k 条语句对 Synapse 来说已经相当不错了!
Synapse 不打算进行事务处理。您需要将单个更新转换为批量更新,并对大批量或行执行 statements like MERGE
,而不是对每行执行 INSERT
、UPDATE
和 DELETE
。
在您的情况下,您可以:
- 按表名对所有插入/更新进行分组
- 为每个组创建一个临时表。例如。
table1_insert_updates
- 从
MERGE
到table1_insert_updates
运行table1
之类的语句。
对于删除:
- 按表名对主键进行分组
- 每桌运行一个
DELETE FROM table1 where key in (primary keys)
。
坦率地说,20k 是一个很糟糕的数字,它不是太小,也远远不够大。因此,即使在“分组”之后,如果批处理/组大小太小,您仍然可能会遇到性能问题。
Synapse 不用于事务处理。它将使用单个 MERGE 语句在不到 5 分钟的时间内将具有 100 万行的表合并到具有 10 亿行的表中,以向上插入 100 万行,但是如果您一个接一个地运行 1000 个删除和 1000 个插入语句,它将可能需要更长的时间!
编辑:您还必须使用 PARTITION BY
和 RANK
(或 ROWNUMBER
)进行重复数据删除,以防在单个批次中对同一行进行多次更新。这并不容易,具体取决于您的输入方式(更新包含所有列(甚至未更改)或仅更改的列),这可能会变得非常复杂。
再次Synapse 不用于事务处理。
,尝试声明一个游标来一次性从 temp_incremental_updates 中选择所有数据,而不是进行多次读取:
CREATE TABLE temp_incremental_updates
WITH
(
DISTRIBUTION = HASH ( [user] ),HEAP
)
AS
SELECT ROW_NUMBER() OVER(ORDER BY (SELECT NULL)) AS Sequence,[user],[statement]
FROM upd.incremental_updates
WHERE processed_flag = 0;
DECLARE cur CURSOR FOR SELECT [statement] FROM temp_incremental_updates ORDER BY Sequence
OPEN cur
FETCH NEXT FROM cur INTO @sql_code
WHILE @@FETCH_STATUS = 0 BEGIN
EXEC sp_executesql @sql_code;
FETCH NEXT FROM cur INTO @sql_code
END
-- Rest of the code