问题描述
我在了解Streams如何跟踪更改方面遇到麻烦。我想创建一个历史表来跟踪每个UPDATE
和DELETE
到一个表,但是我发现我不明白它是如何工作的。
如果我的表Table1
带有流:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,FIELD1 INT,FIELD2 STRING,DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_History ON TABLE Table1;
如果我插入数据:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),(102,'String2')
;
然后运行:
SELECT * FROM Table1_History;
它返回以下内容:
XID FIELD1 FIELD2 DATECREATED MetaDATA$ACTION MetaDATA$ISUPDATE MetaDATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
到目前为止一切顺利。
但是如果我跑步:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
然后从Table1_History
中选择,我得到:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED MetaDATA$ACTION MetaDATA$ISUPDATE MetaDATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
MetaDATA$ACTION
仍为INSERT
,并且FIELD1
的值现在以1001
的形式存储在流中。我再也看不到任何记录,该行以前的值为101
并且已被更新。
如果我运行以下命令:
DELETE FROM Table1 WHERE XID = 2;
流现在返回:
SELECT * FROM Table1_History;
XID FIELD1 FIELD2 DATECREATED MetaDATA$ACTION MetaDATA$ISUPDATE MetaDATA$ROW_ID
1 1001 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
现在在数据库的第二行流中我可以看到0条记录。
我没有得到Stream表中用于跟踪UPDATES/DELETES
的位置。这不是使用流吗?
我尝试了以下操作:Snowflake Streams Made Simple,但我仍然不明白。
解决方法
引用Snowflake文档: “流存储表的 current 事务版本,并且在大多数情况下都是CDC记录的适当来源。”
在Snowflake文档中查看以下示例:https://docs.snowflake.com/en/user-guide/streams.html#example-1
我的理解是,在您提前偏移之前,流将仅保留记录的当前版本。因此,如果在前进偏移量之前插入一条记录然后进行更新,则它将显示一次插入,但是字段将保留最新值。
如果您随后推进偏移量并更新或删除记录,则这些事件将显示在流中-尽管如果您更新并删除了同一记录(在偏移之前),则流将仅显示删除,因为该记录的最后一个位置。
更新1 听起来您似乎正在尝试对表中记录的每个更改实施审核跟踪-这不是Streams要做的,而且我认为您无法使用Streams来实现解决方案记录所有更改。
如果您阅读Streams文档,它会指出:“该流可以提供从源表的当前偏移量到当前事务处理时间(即表的当前版本)的一组更改。该流仅维护的增量。更改;如果多个DML语句更改一行,则流仅包含对该行执行的最新操作。”
CDC是专门与加载数据仓库相关的术语,绝不意味着捕获记录的每项更改。
如果您想在Snowflake中创建真正的审核功能,那么恐怕我不知道这是否可行。时间旅行功能显示,Snowflake会保留对记录所做的所有更改(在保留期内),但是我不知道有什么方法可以访问这些更改。我认为您只能在某个时间点访问记录的历史,而无法知道什么时候进行了任何更改
更新2 刚刚意识到Snowflake允许对表进行更改跟踪,而不必使用Streams。如果要捕获表的所有更改,而不仅仅是最新版本,这可能是一个更好的解决方案。该功能在此处记录: https://docs.snowflake.com/en/sql-reference/constructs/changes.html
,好吧,就像@NickW所说的,streams表更多地是关于跟踪偏移量之间的变化。这意味着我仍然可以做我想做的事,但这需要在INSERT
个操作之间的历史记录表中显式地DML
。
首先创建主表,流和“历史记录”表:
CREATE TABLE Table1
(
XID INT IDENTITY PRIMARY KEY,FIELD1 INT,FIELD2 STRING,DATECREATED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
CREATE STREAM Table1_Stream ON TABLE Table1;
CREATE TABLE Table1_History
(
UID INT IDENTITY PRIMARY KEY,XID INT,DATECREATED TIMESTAMP,METADATA$ACTION STRING,--METADATA Column from Stream
METADATA$ISUPDATE STRING,--METADATA Column from Stream
DATEINSERTED TIMESTAMP DEFAULT CURRENT_TIMESTAMP::TIMESTAMP
);
然后INSERT
条记录:
INSERT INTO Table1 (FIELD1,FIELD2)
VALUES
(101,'String1'),(102,'String2')
;
“流表”现在显示为:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 101 String1 2020-08-13 06:52:34.402 INSERT FALSE 23bc7a4d83522484f4d7e36edf84b4c7986dfa9b
2 102 String2 2020-08-13 06:52:34.402 INSERT FALSE 5b5e429cf3a174303b2f2192b5d602ed9dedd865
然后从“流”表到“历史记录”表中执行INSERT
:
INSERT INTO Table1_History(XID,FIELD1,FIELD2,DATECREATED,METADATA$ACTION,METADATA$ISUPDATE)
SELECT XID,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
请注意,WHERE
子句将只删除INSERT
条记录,除非它是UPDATE
的一部分,DELETEs
然后是INSERTs
条记录。
现在,即使由于WHERE
子句而实际上没有将流记录插入到历史记录表中,如果您查询Stream也会得到NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
现在,如果您执行UPDATE
,则流将显示它:
UPDATE Table1 SET FIELD1 = 1001 WHERE XID = 1;
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 93256f240f338581cc4781c2e79a28075e1b66d7
现在运行插入:
INSERT INTO Table1_History(XID,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
Stream再次为NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
如果您运行DELETE
,则更改将再次反映在流中:
DELETE FROM Table1 WHERE XID = 2
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 51d1d0f5c5bf9c328d79cbbd54a10bf99f73bcd3
它可以INSERT
进入“历史记录”表:
INSERT INTO Table1_History(XID,UPPER(METADATA$ISUPDATE)
FROM Table1_Stream
WHERE METADATA$ACTION <> 'INSERT'
OR METADATA$ISUPDATE = 'TRUE'
SELECT * FROM Table1_History;
UID XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE DATEINSERTED
1 1 1001 String1 2020-08-14 09:11:20.173 INSERT TRUE 2020-08-14 09:13:41.474
2 1 101 String1 2020-08-14 09:11:20.173 DELETE TRUE 2020-08-14 09:13:41.474
4 2 102 String2 2020-08-14 09:11:20.173 DELETE FALSE 2020-08-14 09:17:37.694
Stream再次为NULL:
SELECT * FROM Table1_Stream;
XID FIELD1 FIELD2 DATECREATED METADATA$ACTION METADATA$ISUPDATE METADATA$ROW_ID
我不明白的一件事是,为什么UID
上的Table1_History
增加到4
而不是3
了,但这是一个小问题。
这就是我将跟踪雪花中所有历史变化的方式。