问题描述
我正在尝试在大查询中实现 SCD 类型 2 合并逻辑。
我以 3 种设置方法执行此操作,但每次运行合并逻辑脚本时,即使没有新记录,它似乎也会将记录推送到目标表中。
- 第 1 步:当自然键上的记录匹配时,则停用目标表中的活动记录
- 第 2 步:将之前不存在的新记录插入到目标表中
- 第 3 步:将更新的记录从源表插入目标表(我在这里得到重复)
这是我的代码
CREATE
OR REPLACE PROCEDURE `processed.sp_merge_example` (job_run_id INT64) OPTIONS (strict_mode = false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;
---checking for natural keys for updates
MERGE INTO `processed.<Destination table>` AS T
USING `transient.<Source table>` AS S
ON T.<Destination table>_fingerprint = S.<Source table>_fingerprint
WHEN MATCHED
AND S.type2_checksum != T.type2_checksum
AND T.current_flg = 'Y'
THEN
UPDATE
---updating the records in the final table which has updates in the incremental load.
SET T.modified_datetime = current_datetime(),T.modified_by = CAST(run_id AS STRING),T.end_datetime = current_datetime(),T.current_flg = 'N'
WHEN NOT MATCHED
THEN
---inserting new records.
INSERT (
<Source table columns>
source_type,material_sales_fingerprint,type2_checksum,start_datetime,end_datetime,current_flg,created_by,created_datetime,modified_by,modified_datetime
)
VALUES (
<Source table values>
S.source_type,S.material_sales_fingerprint,S.type2_checksum,current_datetime(),parse_date('%Y%m%d','99991231'),'Y',CAST(run_id AS STRING),current_datetime()
);
-- insert into newly updated records
INSERT `processed.dim_material_sales` (
<Source table columns>
source_type,modified_datetime
)
SELECT
<Source table columns>
ST.source_type,ST.material_sales_fingerprint,ST.type2_checksum,current_datetime()
FROM (
SELECT x.*
FROM `transient.<Source table>` x
JOIN `processed.<Destination table>` y
ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N'
) ST
END;
--最后的这段逻辑导致重复
ON x.<Source table>_fingerprint = y.<Source table>_fingerprint
WHERE x.type2_checksum != y.type2_checksum and y.current_flg = 'N'
如果我运行合并逻辑 n 次 与源中的相同记录期望目标表应该具有相同的记录,但使用上述逻辑,源中的更新记录总是被插入到目标表中每次运行合并逻辑时。我已经测试了很多次,不确定我哪里出错了。有人可以帮我理解。
解决方法
通过以下逻辑克服了这个问题
CREATE OR REPLACE PROCEDURE `<stored_proc_name >`(job_run_id INT64) OPTIONS(strict_mode=false)
BEGIN
DECLARE run_id INT64 DEFAULT job_run_id;
-- Inserting updated records coming as part of incremental load to Destination first.
INSERT
`<destination_table>`(
<Target_table_columns>,tablename_fingerprint,type2_checksum,start_datetime,end_datetime,current_flg,created_by,created_datetime,modified_by,modified_datetime)
SELECT
ST.<Target_table_columns>
current_datetime(),cast(parse_timestamp('%Y%m%d%H%M%S','99991231235959') as datetime),'Y',CAST(run_id AS STRING),current_datetime(),current_datetime()
FROM (
SELECT x.*
FROM
`<source_table>` x
JOIN
`<destination_table>` y
ON
y.<destination_table_fingerprint_column> = x.<source_temp_table_fingerprint_column>
Where x.type2_checksum != y.type2_checksum and y.current_flg = 'Y') ST;
---Merge Process Starts
MERGE INTO
`<destination_table>` AS T
USING
`<source_table>` AS S
ON
T.<destination_table_fingerprint_column> = S.<source_temp_table_fingerprint_column>
-- Updating older version of the records.
WHEN MATCHED
AND S.type2_checksum != T.type2_checksum
AND T.current_flg = 'Y'
THEN
UPDATE
SET T.modified_datetime = current_datetime(),T.modified_by = CAST(run_id AS STRING),T.end_datetime = current_datetime(),T.current_flg = 'N'
-- Inserting new records from temp to final table
WHEN NOT MATCHED
THEN
INSERT
(
<Target_table_columns>,modified_datetime
)
VALUES
(
<S.Target_table_columns>,current_datetime()
);
END;