如何在Airflow中使用last_mod_dt时间戳从Oracle数据源中按小时执行增量提取?

问题描述

需要使用Last_modified_dt时间戳列从Oracle数据源使用Airflow / Python每小时进行一次表刷新/加载。

在Airflow中,存在airflow.models.taskinstance API,该API公开了task_instance元数据表中的数据,并具有以下字段(与示例数据一起显示),假设dag /任务的首次执行日期/时间为1/1/2020 05:00 :-

task_id,dag_id,execution_datetime (of dag),start_date,end_date,duration,state,....
task_a,oracle,1/1/2020 05:00:00,1/1/2020 05:05:00,0.5,success,....
task_b,1/1/2020 05:01:00,1/1/2020 05:04:00,0.3,....
task_c,1/1/202005:00:00,1/1/2020 05:02:00,1/1/2020 05:06:00,0.4,....

因此,我正在考虑使用task_instance元数据表或API来获取每个任务的先前开始日期时间及其状态(成功),并在如下情况下使用它:

因此,在1/1/2020 06:00:00一个小时后运行时:-

select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;

这种方法对吗?如果是,那么每次直接查询气流元数据task_instance表以获得任务的前一个或max(start_datetime)会对性能产生影响吗?如果是,那么我们如何通过airflow.models.taskinstance API(https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html

获取任务的先前start_datetime和“成功”状态。

谢谢!

解决方法

首先,重要的是要了解execution_date的工作原理,请参见Scheduler Doc

在计划的时间段内,计划程序不会触发您的任务 结束,例如,将schedule_interval设置为@daily的作业在 一天结束了。此技术可确保无论数据是 该时间段所需的时间在dag之前完全可用 被执行。 在用户界面中,似乎Airflow正在运行您的任务

如果您以一天的schedule_interval运行DAG,则运行 execute_date 2019-11-21在2019-11-21T23:59之后很快触发。

让我们重复一遍,调度程序将您的作业运行一次schedule_interval 在开始日期之后,在该期间的结尾。

这意味着通过引用execution_date,您可以确切地获得触发上一次运行的时间。

关于查询,我不会查询数据库来获取最后的执行日期,而是使用Airflow内置的宏-see this reference

您应该只可以在查询中使用{{ execution_date }},并且在触发DAG运行时,Airflow应该将其替换。