问题描述
需要使用Last_modified_dt时间戳列从Oracle数据源使用Airflow / Python每小时进行一次表刷新/加载。
在Airflow中,存在airflow.models.taskinstance API,该API公开了task_instance元数据表中的数据,并具有以下字段(与示例数据一起显示),假设dag /任务的首次执行日期/时间为1/1/2020 05:00 :-
task_id,dag_id,execution_datetime (of dag),start_date,end_date,duration,state,....
task_a,oracle,1/1/2020 05:00:00,1/1/2020 05:05:00,0.5,success,....
task_b,1/1/2020 05:01:00,1/1/2020 05:04:00,0.3,....
task_c,1/1/202005:00:00,1/1/2020 05:02:00,1/1/2020 05:06:00,0.4,....
因此,我正在考虑使用task_instance元数据表或API来获取每个任务的先前开始日期时间及其状态(成功),并在如下情况下使用它:
因此,在1/1/2020 06:00:00一个小时后运行时:-
select * from table_a where last_mod_dttm > prev(start_datetime of task_id=task_a) and state = sucesss;
select * from table_b where last_mod_dttm > prev(start_datetime of task_id=task_b) and state = sucesss;
select * from table_c where last_mod_dttm > prev(start_datetime of task_id=task_c) and state = sucesss;
这种方法对吗?如果是,那么每次直接查询气流元数据task_instance表以获得任务的前一个或max(start_datetime)会对性能产生影响吗?如果是,那么我们如何通过airflow.models.taskinstance API(https://airflow.readthedocs.io/en/latest/_api/airflow/models/taskinstance/index.html)
获取任务的先前start_datetime和“成功”状态。谢谢!
解决方法
首先,重要的是要了解execution_date
的工作原理,请参见Scheduler Doc:
在计划的时间段内,计划程序不会触发您的任务 结束,例如,将schedule_interval设置为@daily的作业在 一天结束了。此技术可确保无论数据是 该时间段所需的时间在dag之前完全可用 被执行。 在用户界面中,似乎Airflow正在运行您的任务
如果您以一天的schedule_interval运行DAG,则运行 execute_date 2019-11-21在2019-11-21T23:59之后很快触发。
让我们重复一遍,调度程序将您的作业运行一次schedule_interval 在开始日期之后,在该期间的结尾。
这意味着通过引用execution_date
,您可以确切地获得触发上一次运行的时间。
关于查询,我不会查询数据库来获取最后的执行日期,而是使用Airflow内置的宏-see this reference:
您应该只可以在查询中使用{{ execution_date }}
,并且在触发DAG运行时,Airflow应该将其替换。