Spark SQL在不降低历史状态的情况下生成SCD2

问题描述

来自关系数据库的数据被加载到spark中-应该是每天,但实际上不是每天。此外,它是数据库的完整副本-无增量加载。

为了将维度表与主要事件数据轻松地连接在一起,我想:

  • 删除重复数据(即提高以后广播加入的可能性)
  • 具有valid_to / valid_from列,因此即使每天(不一致)都无法获得数据,仍然可以很好地使用(从下游)

我正在使用spark 3.0.1,并且希望以SCD2样式转换现有数据-不会丢失历史记录。

spark-shell

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int,value:Int,date:String)
val d = Seq(Foo(1,1,"20200101"),Foo(1,8,"20200102"),9,"20200120"),"20200121"),"20200122"),"20200103"),Foo(2,5,10,"20200113")).toDF
d.show

val windowDeduplication =  Window.partitionBy("key","value").orderBy("key","date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key","date")

val nextThing = lead("date",1).over(windowPrimaryKey)
d.withColumn("date",to_date(col("date"),"yyyyMMdd")).withColumn("rank",rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to",nextThing).withColumn("valid_to",when(nextThing.isNotNull,date_sub(nextThing,1)).otherwise(current_date)).withColumnRenamed("date","valid_from").orderBy("key","valid_from","valid_to").show

导致:

+---+-----+----------+----------+
|key|value|valid_from|  valid_to|
+---+-----+----------+----------+
|  1|    1|2020-01-01|2020-01-01|
|  1|    8|2020-01-02|2020-01-12|
|  1|   10|2020-01-13|2020-01-19|
|  1|    9|2020-01-20|2020-10-09|
|  2|    5|2020-01-01|2020-10-09|
+---+-----+----------+----------+

这已经相当不错了。但是:

|  1|    1|2020-01-03|   2|2020-01-12|

迷路了。 即稍后(中间更改后)再次出现的所有值都将丢失。 如何在不保持较大排名的情况下保留该行:

d.withColumn("date",rank().over(windowDeduplication)).withColumn("valid_to","valid_to").show

+---+-----+----------+----+----------+
|key|value|valid_from|rank|  valid_to|
+---+-----+----------+----+----------+
|  1|    1|2020-01-01|   1|2020-01-01|
|  1|    8|2020-01-02|   1|2020-01-02|
|  1|    1|2020-01-03|   2|2020-01-12|
|  1|   10|2020-01-13|   1|2020-01-19|
|  1|    9|2020-01-20|   1|2020-01-20|
|  1|    9|2020-01-21|   2|2020-01-21|
|  1|    9|2020-01-22|   3|2020-10-09|
|  2|    5|2020-01-01|   1|2020-10-09|
+---+-----+----------+----+----------+

绝对不想要的

  • 想法是删除重复项
  • 但是请使用valid_to,valid_from
  • 保留对数据的任何历史性更改

如何正确地将其转换为SCD2表示形式,即具有有效_自,有效_至但不丢弃中间状态?

注意:我不需要更新现有数据(合并为JOIN)。可以重新创建/覆盖它。

Implement SCD Type 2 in Spark似乎太复杂了。在我的情况下,是否有不需要状态处理的更好方法?即我有源自数据库的每日完整副本的数据,并且想对它进行重复数据删除

解决方法

以前的方法仅保留副本的第一个(最早的)版本。我认为,没有连接的唯一解决方案是使用窗口函数,该函数将每个值与上一行进行比较-如果整行中没有变化,则将其丢弃。

效率可能较低-但更准确。但这还取决于手头的用例,即再次看到更改后的值的可能性。

import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

case class Foo (key:Int,value:Int,value2:Int,date:String)
val d = Seq(Foo(1,1,"20200101"),Foo(1,8,"20200102"),9,"20200120"),6,"20200121"),"20200122"),"20200103"),Foo(2,5,10,"20200113"),"20210120"),"20220121"),3,"20230122")).toDF

def compare2Rows(key:Seq[String],sortChangingIgnored:Seq[String],timeColumn:String)(df:DataFrame):DataFrame = {
    val windowPrimaryKey = Window.partitionBy(key.map(col):_*).orderBy(sortChangingIgnored.map(col):_*)
    val columnsToCompare = df.drop(key ++ sortChangingIgnored:_*).columns

    val nextDataChange = lead(timeColumn,1).over(windowPrimaryKey)

    val deduplicated = df.withColumn("data_changes",columnsToCompare.map(e=> col(e) =!= lead(col(e),1).over(windowPrimaryKey)).reduce(_ or _)).filter(col("data_changes").isNull or col("data_changes"))
    deduplicated.withColumn("valid_to",when(nextDataChange.isNotNull,date_sub(nextDataChange,1)).otherwise(current_date)).withColumnRenamed("date","valid_from").drop("data_changes")
}
d.orderBy("key","date").show
d.withColumn("date",to_date(col("date"),"yyyyMMdd")).transform(compare2Rows(Seq("key"),Seq("date"),"date")).orderBy("key","valid_from","valid_to").show

返回:

+---+-----+------+----------+----------+
|key|value|value2|valid_from|  valid_to|
+---+-----+------+----------+----------+
|  1|    1|     1|2020-01-01|2020-01-01|
|  1|    8|     1|2020-01-02|2020-01-02|
|  1|    1|     1|2020-01-03|2020-01-12|
|  1|   10|     1|2020-01-13|2020-01-19|
|  1|    9|     1|2020-01-20|2020-01-20|
|  1|    6|     1|2020-01-21|2022-01-20|
|  1|    9|     1|2022-01-21|2023-01-21|
|  1|    9|     3|2023-01-22|2020-10-09|
|  2|    5|     1|2020-01-01|2020-10-09|
+---+-----+------+----------+----------+

输入以下内容:

+---+-----+------+--------+
|key|value|value2|    date|
+---+-----+------+--------+
|  1|    1|     1|20200101|
|  1|    8|     1|20200102|
|  1|    1|     1|20200103|
|  1|   10|     1|20200113|
|  1|    9|     1|20200120|
|  1|    6|     1|20200121|
|  1|    9|     1|20200122|
|  1|    9|     1|20210120|
|  1|    9|     1|20220121|
|  1|    9|     3|20230122|
|  2|    5|     1|20200101|
+---+-----+------+--------+

此功能的缺点是,对于每个键,都会建立无限量的状态……但是当我计划将其应用于较小的维度表时,我认为还是应该没问题。