问题描述
我有一个雇员表,其中有salary
个employee
使用三角洲湖泊管理。
我可以使用delta lake支持的时间旅行功能,基于 version 或 timestamp 查询表。
SELECT *
FROM DELTA.`EMPLOYEE`
VERSION AS OF 3
但是我想知道在所有增量表版本中对雇员所做的所有更改的历史记录。像这样
SELECT *,timestamp -- From delta table,version -- From delta table
FROM DELTA.`EMPLOYEE`
WHERE EMPLOYEE = 'George'
WITHIN ALL VERSIONS --Never exists but just for understanding
解决方法
这是一个老问题,但今天我偶然发现了它,因为我有一些问题要解决。我认为 Delta (delta.io) 没有为此提供一种方法,因为 Delta 围绕时间旅行旋转到特定时间点而不是一段时间。
但如果我必须得到这个,我想一种方法是直接读取镶木地板文件(忽略增量日志),这将导致记录的所有过去版本/状态(将真空等放在一边)。
现在,如果需要获取每个记录创建的确切版本(这是我的要求),请使用类似
dataframe.withColumn("input_file",input_file_name())
这将显示记录来自的确切文件名。
现在查询 .json _delta_log 事务文件,它会告诉我们哪个版本添加了哪个文件,就像这样
>>> details = spark.read.json('/data/gcs/delta/ingest/bigtable/_delta_log/*.json')
>>> details = details.select(col('add')['path'].alias("file_path")).withColumn("version",substring(input_file_name(),-6,1)).filter("file_path is not NULL")
>>> details.show(5,100)
+-------------------------------------------------------------------+-------+
| file_path|version|
+-------------------------------------------------------------------+-------+
|part-00000-148c98cc-0db1-495e-bb67-0ba1cc4fd45e-c000.snappy.parquet| 4|
|part-00001-2caa89b7-c990-47e0-b7b0-92430b15b141-c000.snappy.parquet| 4|
|part-00002-1f900af7-d819-48e9-a048-ad22e5c7ce65-c000.snappy.parquet| 4|
|part-00003-e043f466-861b-47f0-a1cf-4b67e75a5ed2-c000.snappy.parquet| 4|
|part-00000-93cc0747-ca0b-46ef-ada4-b3fb18e48925-c000.snappy.parquet| 0|
+-------------------------------------------------------------------+-------+
only showing top 5 rows
在 file_path 上加入这两个数据帧,您将看到记录的每个状态/版本以及它在其中创建的增量版本。我的示例 -
parquet_table = spark.read.parquet('/data/gcs/delta/ingest/bigtable/*.parquet')
>>> parquet_table.printSchema()
root
|-- Region: string (nullable = true)
|-- Country: string (nullable = true)
|-- Item_Type: string (nullable = true)
|-- Sales_Channel: string (nullable = true)
parquet_table = parquet_table.where(col("Order_ID")==913712584).\
withColumn("input_file",38,1000)).\
select(["Order_ID","Region","Country","Sales_Channel","input_file"]).\
orderBy("Country")
>>> parquet_table.join(details,parquet_table.input_file == details.file_path).select("Order_ID","version").orderBy("version").show(100)
+---------+------------------+-------+-------------+-------+
| Order_ID| Region|Country|Sales_Channel|version|
+---------+------------------+-------+-------------+-------+
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|
|913712584|Sub-Saharan Africa|Lesotho| Online| 0|
,
在 Databricks 上,从 Databricks Runtime 8.2 开始,有一个名为 Change Data Feed 的功能可以跟踪对表所做的更改,您可以以批处理或流的形式提取更改源以进行分析或实施更改数据捕获式处理。
在表上启用更改数据提要后,您可以使用批处理或流 API 读取数据,如下所示:
spark.read.format("delta") \
.option("readChangeFeed","true") \
.option("startingVersion",0) \
.table("myDeltaTable")
并且您将获得带有 additional columns 的所有更改记录,这些记录描述了进行了哪些更改(插入/更新/删除)、何时发生(时间戳)以及在哪个版本中。