从 S3 到雪花的雪花 CDC

问题描述

我有 S3 存储桶,每天早上从 oracle 向 S3 获取数据。 通过使用 SNowpipe,我将数据加载到名为 t1 的 SNowflake 表中。 现在我正在创建一个新表 t2,它将包含表 t1 中的 cdc 数据。

我知道我们可以使用任务和流来捕获它。然而,流将捕获记录上的事件插入、更新或删除。但在我们的例子中,我们将数据附加到 t1 中,并尝试将 id 列的更新加载到 t2 中。

例如在第 1 天,表 T1 将被加载

Table t1
id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021

表 t2 将直接从 t1 加载

表 t2

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021

现在是第 2 天,因为我们直接将数据附加到表 t1 中。 看起来像这样

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       32000       12-03-2021
3       33000       12-03-2021
5       12500       13-03-2021
2       45000       13-03-2021

现在表 t2 应该有 id 2 的更新值和 id 5 的新值, 如下

id      salary      load_date
--      ------      ----------
1       12000       12-03-2021
2       45000       12-03-2021
3       33000       12-03-2021
5       12500       13-03-2021

我感觉流不会有帮助,因为主表 (t1) 中没有更新, 所以计划在 t1 的表 t2 上使用带有合并语句的任务 类似的东西

CREATE OR REPLACE TASK EMPLOYEES_CDC
  WAREHOUSE = COmpuTE_WH
  SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'
AS
merge INTO t2 using (select * from t2 where load_date = current_date) t3 on t2.id = t3.id
      when matched then update t2.salary = t1.salary
      when not matched then insert (id,salary,load_date) values (t3.id,t3.salary,t3.load_date)

因为合并看起来有点贵。

请建议这是最佳方法还是有更好的方法

解决方法

您所描述的解决方案是解决此问题的最佳方法。 流肯定会有所帮助,您需要做的就是更改合并语句以使用流而不是表 t1。因此,您的合并语句只会处理增量。

,

为什么不参考这个可以帮助您确定如何应用流的content

您需要确保在外部表上创建流然后使用它。虽然外部表上的流有一些限制,但您也可以检查您的用例。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...