问题描述
问题
主要问题
我怎么能短暂地从每日摘录的文件夹中实现缓慢变化的维度类型2,其中每个csv是来自源系统的表的完整摘录?
理性
我们正在将临时数据仓库设计为面向最终用户的数据集市,这些数据集可以随意旋转和销毁。这要求我们将所有数据都存储在湖泊/斑点/桶中。
我们每天提取全部摘录,因为:
- 我们不能可靠地仅提取变更集(由于无法控制的原因),并且
- 我们希望维护一个包含“最少”可能数据的数据湖。
挑战问题
有没有一种解决方案可以给我特定日期的状态,而不仅仅是“最新”状态?
存在的问题
我是不是完全想着这件事,有更简单的方法吗?
可能的方法
自定义dbt
实现
dbt.utils
软件包中有一个insert_by_period
dbt实现,我想这可能正是我想要的吗?但是我很困惑,因为它是dbt snapshot
,但是:
- 一次为每个文件递增运行
dbt snapshot
;并且, - 是直接在外部表上构建的吗?
三角洲湖
我对Databricks的三角洲湖泊了解不多,但是似乎Delta Tables应该可以实现吗?
修复提取作业
如果我们使提取物仅包含自上次提取物以来发生的变化,那么是否解决了我们的障碍?
示例
假设以下三个文件位于数据湖的文件夹中。 (Gist with the 3 csvs and desired table outcome as csv)。 我添加了Extracted列,以防从文件名中解析时间戳太麻烦了。
2020-09-14_CRM_extract.csv
| OppId | CustId | Stage | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1 | A | 2 - Qualify | | 9/1 | 9/14 |
| 2 | B | 3 - Propose | | 9/12 | 9/14 |
2020-09-15_CRM_extract.csv
| OppId | CustId | Stage | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1 | A | 2 - Qualify | | 9/1 | 9/15 |
| 2 | B | 4 - Closed | Y | 9/14 | 9/15 |
| 3 | C | 1 - Lead | | 9/14 | 9/15 |
2020-09-16_CRM_extract.csv
| OppId | CustId | Stage | Won | LastModified | Extracted |
|-------|--------|-------------|-----|--------------|-----------|
| 1 | A | 2 - Qualify | | 9/1 | 9/16 |
| 2 | B | 4 - Closed | Y | 9/14 | 9/16 |
| 3 | C | 2 - Qualify | | 9/15 | 9/16 |
最终结果
以下是截至9/16的三个文件的SCD-II。截至9/15,SCD-II相同,但OppId=3
仅包含valid_from=9/15
和valid_to=null
| OppId | CustId | Stage | Won | LastModified | valid_from | valid_to |
|-------|--------|-------------|-----|--------------|------------|----------|
| 1 | A | 2 - Qualify | | 9/1 | 9/14 | null |
| 2 | B | 3 - Propose | | 9/12 | 9/14 | 9/15 |
| 2 | B | 4 - Closed | Y | 9/14 | 9/15 | null |
| 3 | C | 1 - Lead | | 9/14 | 9/15 | 9/16 |
| 3 | C | 2 - Qualify | | 9/15 | 9/16 | null |
解决方法
有趣的概念,当然,与该论坛相比,对话可能需要更长的时间才能完全了解您的业务,利益相关者,数据等。我可以看到,如果您的数据量相对较小,则可能有用系统很少更改,您的报告要求(因此,数据集市)也很少更改,您只需要很少地启动这些数据集市。
我担心的是:
- 如果您的源需求或目标需求发生了变化,您将如何处理?您将需要提升数据集市,对其进行全面的回归测试,应用更改,然后进行测试。如果您在知道更改时/在知道更改的情况下执行此操作,那么对于不使用的Datamart来说会付出很多努力-特别是如果您需要在两次使用之间多次执行此操作;如果您在需要数据集市时执行此操作,那么您将无法达到使数据集市可用于“即时”使用的目标。
您的陈述“我们拥有一个DW,可以删除,更新和重新创建它的代码,而没有传统DW变更管理所带来的复杂性”,我不确定是真的。您将如何测试代码更新,而又不搞乱数据集市并经历数据的标准测试周期-那么这与传统的DW变更管理有何不同?
- 如果源系统中的数据损坏/意外,会发生什么?在每天要加载数据的“正常” DW中,通常会在当天注意到并修复该问题。在您的解决方案中,不可靠的数据可能在数天/周前发生,并且假设它已加载到数据集市中,而不是在加载时出错,则需要适当的流程来发现它,然后可能必须拆散数天的SCD记录才能解决问题。
- (仅当您有大量数据时才有意义)鉴于存储成本低,我不确定是否需要在需要时拆分数据集市而不是仅保留数据以备使用。每次启动数据集市时,加载大量数据将既耗时又昂贵。可能的混合方法可能是仅在需要数据集市时才运行增量负载,而不是每天运行它们-因此您可以随时使用上次使用数据集市时的数据,并且只需添加自最后一次加载
我不知道这是否是最好的,但是我已经看到了。建立初始SCD-II表时,添加一列,该列是记录的所有值的存储HASH()
值(可以排除主键)。然后,您可以每天在传入的完整数据集上创建一个外部表,其中包括相同的HASH()
函数。现在,您可以基于主键以及HASH值是否已更改,对SCD-II执行MERGE
或INSERT/UPDATE
。
以这种方式进行处理的主要优点是,您避免每天将所有数据加载到Snowflake中进行比较,但是这种方式执行起来会较慢。您还可以使用HASH()
语句中包含的COPY INTO
函数加载到临时表,然后更新SCD-II,然后删除该临时表,这实际上可能会更快。