问题描述
我们每天都有一个流,我们会在其中获取使用各种产品的客户列表。
我正在尝试为客户创建一个表格,我们可以在其中跟踪他们的变化,同时,我们可以获得一个不同的客户列表。
该流每天包含数千条记录。这就是我们认为应该从 SCD Type 1 转向 SCD Type 2 的原因。
我们想实现这个过程,让它每天运行,获取最后一天的记录,并将它们与整个表进行比较。如果客户有任何更改,它将将该行标记为 0,并获取新行并将其标记为 1。
但是在这个过程中,我得到了新的记录,但是我在运行存储过程时也得到了重复的数据。
请指导。
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
Create PROC [dbo].[sp_UpdateCustomerInfoHistory] AS BEGIN
SET
NOCOUNT ON --Truncate Table [dbo].[CustomerInfoHistory];
DECLARE @TODAY DATE = GETDATE();
DECLARE @YESTERDAY DATE = GETDATE() - 1;
WITH CTE AS (
SELECT
disTINCT(a.CustomerId) AS CustomerId,ISNULL(b.[CustomerName],a.[CustomerName]) AS CustomerName,ISNULL(b.[CurrentDefaultDomain],a.[CustomerName]) AS CurrentDefaultDomain,ISNULL(b.[CustomerCountryCode],'UnkNown') AS CustomerCountryCode,ISNULL(b.[HasC],0) AS HasC,ISNULL(b.[HasG],0) AS HasG,ISNULL(b.[IsV],0) AS IsV,ISNULL(
ISNULL(b.[CustomerCreatedDate],a.[ProductCreatedTimeUtc]),@TODAY
) AS CustomerCreatedDate,ISNULL(b.[CustomerState],'Active') AS CustomerState,ISNULL(b.[CustomerType],'RegularCustomer') AS CustomerType,ISNULL(b.[DataCenterProduct],'UnkNown') AS DataCenterProduct,ISNULL(b.[DataCenterModel],'UnkNown') AS DataCenterModel,ISNULL(b.[IsTestCustomer],0) AS IsTestCustomer,ISNULL(b.[CommunicationLanguage],'UnkNown') AS CommunicationLanguage,ISNULL(b.[IsInternal],0) AS IsInternal,ISNULL(b.[IndustryName],'N/A') AS IndustryName,ISNULL(c.MappingID,0) AS MappingID
FROM
[dbo].[ProductDetails] AS a
LEFT JOIN [Common].[vwdimCustomer_Staging] AS b ON a.CustomerId = b.CustomerId
LEFT JOIN [Common].[vwmapCustomerMappingID_Staging] AS c ON b.CustomerId = c.CustomerId
WHERE a.[TIMESTAMP] = @YESTERDAY
),CTE1 AS (
Select *,BINARY_CHECKSUM(
CustomerId,CustomerName,IsTestCustomer,IsInternal
) AS MKEY
from CTE)
MERGE INTO [dbo].[CustomerInfoHistory] AS T USING CTE1 AS S ON T.[MKEY] = S.[MKEY]
WHEN MATCHED
AND [Current_Flag] = 1
AND T.CustomerName <> S.CustomerName THEN
UPDATE
SET
T.Current_Flag = 0,T.End_date = @YESTERDAY
WHEN NOT MATCHED BY TARGET THEN
INSERT
(
CustomerId,CurrentDefaultDomain,CustomerCountryCode,HasC,HasG,IsV,CustomerCreatedDate,CustomerState,CustomerType,DataCenterProduct,DataCenterModel,CommunicationLanguage,IsInternal,IndustryName,MappingID,Eff_Date,End_Date,Current_Flag,MKEY,RefreshedDate
)
VALUES
(
S.CustomerId,S.CustomerName,S.CurrentDefaultDomain,S.CustomerCountryCode,S.HasC,S.HasG,S.IsV,S.CustomerCreatedDate,S.CustomerState,S.CustomerType,S.DataCenterProduct,S.DataCenterModel,S.IsTestCustomer,S.CommunicationLanguage,S.IsInternal,S.IndustryName,S.MappingID,@YESTERDAY,'12/31/2099',1,S.MKEY,@TODAY
);
END
解决方法
我认为您可以在 Azure Synapse 中使用 MERGE。它将根据主键值插入新行或更新旧行。
例如:
- 创建表:
CREATE TABLE dbo.CustomerInfoHistory (
CustomerId int NOT NULL,CustomerName nvarchar(254) NOT NULL,CurrentDefaultDomain nvarchar(max) NULL
);
GO
ALTER TABLE dbo.CustomerInfoHistory ADD CONSTRAINT PK__kruserpr__6E092EE804688C07 PRIMARY KEY (CustomerId,CustomerName);
GO
- 创建一个名为
dbo.CustomerInfoHistory_type
的表值参数,它将在我的存储过程中使用:
create TYPE dbo.CustomerInfoHistory_type AS TABLE(
CustomerId int NOT NULL,CurrentDefaultDomain nvarchar(max)
)
GO
- 创建一个存储过程,它将合并相同的记录并根据主键插入新记录:
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
create PROCEDURE [dbo].[spUpsertCustomerInfoHistory]
@profile dbo.CustomerInfoHistory_type READONLY
AS
BEGIN
MERGE dbo.CustomerInfoHistory AS target_sqldb
USING @profile AS source_tblstg
ON (target_sqldb.CustomerId = source_tblstg.CustomerId and target_sqldb.CustomerName = source_tblstg.CustomerName )
WHEN MATCHED THEN
UPDATE SET
CurrentDefaultDomain = source_tblstg.CurrentDefaultDomain
WHEN NOT MATCHED THEN
INSERT (
CustomerId,CustomerName,CurrentDefaultDomain
)
VALUES (
source_tblstg.CustomerId,source_tblstg.CustomerName,source_tblstg.CurrentDefaultDomain
);
END
GO
- 之后,我们可以通过以下代码执行存储过程:
DECLARE @profileVar AS dbo.CustomerInfoHistory_type;
/* Add data to the table variable. */
INSERT INTO @profileVar (CustomerId,CurrentDefaultDomain) values (1,'tom','wednesday');
exec [dbo].[spUpsertCustomerInfoHistory] @profileVar
仅此而已。