问题描述
来自关系数据库的数据被加载到spark中-应该是每天,但实际上不是每天。此外,它是数据库的完整副本-无增量加载。
为了将维度表与主要事件数据轻松地连接在一起,我想:
- 删除重复数据(即提高以后广播加入的可能性)
- 具有valid_to / valid_from列,因此即使每天(不一致)都无法获得数据,仍然可以很好地使用(从下游)
我正在使用spark 3.0.1,并且希望以SCD2样式转换现有数据-不会丢失历史记录。
spark-shell
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int,value:Int,date:String)
val d = Seq(Foo(1,1,"20200101"),Foo(1,8,"20200102"),9,"20200120"),"20200121"),"20200122"),"20200103"),Foo(2,5,10,"20200113")).toDF
d.show
val windowDeduplication = Window.partitionBy("key","value").orderBy("key","date")
val windowPrimaryKey = Window.partitionBy("key").orderBy("key","date")
val nextThing = lead("date",1).over(windowPrimaryKey)
d.withColumn("date",to_date(col("date"),"yyyyMMdd")).withColumn("rank",rank().over(windowDeduplication)).filter(col("rank") === 1).drop("rank").withColumn("valid_to",nextThing).withColumn("valid_to",when(nextThing.isNotNull,date_sub(nextThing,1)).otherwise(current_date)).withColumnRenamed("date","valid_from").orderBy("key","valid_from","valid_to").show
导致:
+---+-----+----------+----------+
|key|value|valid_from| valid_to|
+---+-----+----------+----------+
| 1| 1|2020-01-01|2020-01-01|
| 1| 8|2020-01-02|2020-01-12|
| 1| 10|2020-01-13|2020-01-19|
| 1| 9|2020-01-20|2020-10-09|
| 2| 5|2020-01-01|2020-10-09|
+---+-----+----------+----------+
这已经相当不错了。但是:
| 1| 1|2020-01-03| 2|2020-01-12|
迷路了。 即稍后(中间更改后)再次出现的所有值都将丢失。 如何在不保持较大排名的情况下保留该行:
d.withColumn("date",rank().over(windowDeduplication)).withColumn("valid_to","valid_to").show
+---+-----+----------+----+----------+
|key|value|valid_from|rank| valid_to|
+---+-----+----------+----+----------+
| 1| 1|2020-01-01| 1|2020-01-01|
| 1| 8|2020-01-02| 1|2020-01-02|
| 1| 1|2020-01-03| 2|2020-01-12|
| 1| 10|2020-01-13| 1|2020-01-19|
| 1| 9|2020-01-20| 1|2020-01-20|
| 1| 9|2020-01-21| 2|2020-01-21|
| 1| 9|2020-01-22| 3|2020-10-09|
| 2| 5|2020-01-01| 1|2020-10-09|
+---+-----+----------+----+----------+
绝对不想要的
- 想法是删除重复项
- 但是请使用valid_to,valid_from 保留对数据的任何历史性更改
如何正确地将其转换为SCD2表示形式,即具有有效_自,有效_至但不丢弃中间状态?
注意:我不需要更新现有数据(合并为JOIN)。可以重新创建/覆盖它。
即Implement SCD Type 2 in Spark似乎太复杂了。在我的情况下,是否有不需要状态处理的更好方法?即我有源自数据库的每日完整副本的数据,并且想对它进行重复数据删除。
解决方法
以前的方法仅保留副本的第一个(最早的)版本。我认为,没有连接的唯一解决方案是使用窗口函数,该函数将每个值与上一行进行比较-如果整行中没有变化,则将其丢弃。
效率可能较低-但更准确。但这还取决于手头的用例,即再次看到更改后的值的可能性。
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
case class Foo (key:Int,value:Int,value2:Int,date:String)
val d = Seq(Foo(1,1,"20200101"),Foo(1,8,"20200102"),9,"20200120"),6,"20200121"),"20200122"),"20200103"),Foo(2,5,10,"20200113"),"20210120"),"20220121"),3,"20230122")).toDF
def compare2Rows(key:Seq[String],sortChangingIgnored:Seq[String],timeColumn:String)(df:DataFrame):DataFrame = {
val windowPrimaryKey = Window.partitionBy(key.map(col):_*).orderBy(sortChangingIgnored.map(col):_*)
val columnsToCompare = df.drop(key ++ sortChangingIgnored:_*).columns
val nextDataChange = lead(timeColumn,1).over(windowPrimaryKey)
val deduplicated = df.withColumn("data_changes",columnsToCompare.map(e=> col(e) =!= lead(col(e),1).over(windowPrimaryKey)).reduce(_ or _)).filter(col("data_changes").isNull or col("data_changes"))
deduplicated.withColumn("valid_to",when(nextDataChange.isNotNull,date_sub(nextDataChange,1)).otherwise(current_date)).withColumnRenamed("date","valid_from").drop("data_changes")
}
d.orderBy("key","date").show
d.withColumn("date",to_date(col("date"),"yyyyMMdd")).transform(compare2Rows(Seq("key"),Seq("date"),"date")).orderBy("key","valid_from","valid_to").show
返回:
+---+-----+------+----------+----------+
|key|value|value2|valid_from| valid_to|
+---+-----+------+----------+----------+
| 1| 1| 1|2020-01-01|2020-01-01|
| 1| 8| 1|2020-01-02|2020-01-02|
| 1| 1| 1|2020-01-03|2020-01-12|
| 1| 10| 1|2020-01-13|2020-01-19|
| 1| 9| 1|2020-01-20|2020-01-20|
| 1| 6| 1|2020-01-21|2022-01-20|
| 1| 9| 1|2022-01-21|2023-01-21|
| 1| 9| 3|2023-01-22|2020-10-09|
| 2| 5| 1|2020-01-01|2020-10-09|
+---+-----+------+----------+----------+
输入以下内容:
+---+-----+------+--------+
|key|value|value2| date|
+---+-----+------+--------+
| 1| 1| 1|20200101|
| 1| 8| 1|20200102|
| 1| 1| 1|20200103|
| 1| 10| 1|20200113|
| 1| 9| 1|20200120|
| 1| 6| 1|20200121|
| 1| 9| 1|20200122|
| 1| 9| 1|20210120|
| 1| 9| 1|20220121|
| 1| 9| 3|20230122|
| 2| 5| 1|20200101|
+---+-----+------+--------+
此功能的缺点是,对于每个键,都会建立无限量的状态……但是当我计划将其应用于较小的维度表时,我认为还是应该没问题。