使用 pyspark 实现 SCD type2

问题描述

我试图使用 pyspark 实现 SCD 类型 2 并将数据插入 teradata 。我能够生成具有旧历史记录(已存在于数据库中)和新记录的数据框,但是当我执行该数据框的 spark.overwritewith truncatemode = true 时,我可以看到旧的历史数据来自该数据框没有被插入,只有新的记录被插入。例如,下面是一个示例表和数据框。因此,在表中,如果一名员工被提升,那么我们必须为新角色提供一个条目,并且我们必须维护该员工的旧详细信息。就像 Ray 从团队成员晋升为经理一样,最终表应该有 2 个条目,团队成员的当前记录为 0,新指定的当前记录为 1。因此,为了实现这一点,我最终创建了一个包含两个条目的数据框,但是当我尝试使用 truncate mode = 'true' 执行 spark.overwrite 到 teradata 数据库时,我们看到只有新记录被插入,但记录与当前ind = 0 没有被插入。还有一件事,我在插入之前用新的 id 在数据帧本身中生成 id (PK) 列。

**table1**

|id(PK)| emp_id | name | designation | current record ind
|------|--------|------|---|-------------------------
| 1    |101     |Ray   | team member | 1
| 2    |102     |John  | team member | 1


Dataframe 
-----------------

|id    | emp_id | name | designation | current record ind
|------|--------|------|----------------------------
| 3    |101     |Ray   | Manager     | 1
| 4    |102     |John  | team member  | 1
| 5    |101     | Ray  | team member | 0

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)