调度气流DAG仅在星期一至星期五即仅工作日运行

问题描述

我有一个执行Python脚本的DAG,该脚本接受一个date参数(当前日期)。我将DAG安排在星期一至星期五的上午6:00(即工作日东部标准时间)运行。 DAG必须在星期一以星期一日期作为参数运行Python脚本,在星期二到星期五一直以星期五日期作为参数运行同样的操作。

我注意到使用时间表间隔'0 6 * * 1-5'无效,因为直到下一个星期一才执行星期五。

我将计划时间间隔更改为'0 6 * * *',以每天早上6:00运行,并在一天开始时过滤掉‘0 6 * * 1-5’内的日期,因此从星期一到星期五有效。对于周六和周日,应跳过下游任务。

这是我的代码

from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime,timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from croniter import croniter


log = logging.getLogger(__name__)

def filter_processing_date(**context):
    execution_date = context['execution_date']
    cron = croniter('0 6 * * 1-5',execution_date)
    log.info('cron is: {}'.format(cron))
    log.info('execution date is: {}'.format(execution_date))
    #prev_date = cron.get_prev(datetime)
    #log.info('prev_date is: {}'.format(prev_date))
    return execution_date == cron.get_next(datetime).get_prev(datetime)


local_tz = pendulum.timezone("America/New_York")
# DAG parameters

default_args = {
    'owner': 'Managed Services','depends_on_past': False,'start_date': datetime(2020,8,3,tzinfo=local_tz),'dagrun_timeout': None,'email': Variable.get('email'),'email_on_failure': True,'email_on_retry': False,'provide_context': True,'retries': 12,'retry_delay': timedelta(minutes=5)
}

with DAG(
    'execute_python',schedule_interval='0 6 * * *',default_args=default_args
    ) as dag:

    start_dummy = DummyOperator(
        task_id='start',dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',trigger_rule=TriggerRule.NONE_Failed,dag=dag
    )

    weekdays_only = ShortCircuitOperator(
        task_id='weekdays_only',python_callable=filter_processing_date,dag=dag
    )


    run_python = SSHOperator(
    ssh_conn_id="oci_connection",task_id='run_python',command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ ds_nodash }}',dag=dag)


    start_dummy >> weekdays_only >> run_python >> end_dummy

不幸的是,weekdays_only任务失败,并显示以下错误消息。怎么了?

result

Airflow error message

气流版本:v1.10.9-composer

Python 3。

解决方法

我设法通过一起砍东西解决了自己的问题。检查下一个执行日期是否是工作日,如果是,则返回true,否则返回false。我在ShortCircuitOperator中调用该函数,如果为true,则继续处理下游任务,如果为false,则跳过这些任务。

这是下面的代码,但我愿意寻求更好的解决方案。

from __future__ import print_function
import pendulum
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime,timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


log = logging.getLogger(__name__)


def checktheday(**context):
    next_execution_date = context['next_execution_date']
    log.info('next_execution_date is: {}'.format(next_execution_date))
    date_check = next_execution_date.weekday()
    log.info('date_check is: {}'.format(date_check))
    if date_check == 0 or date_check == 1 or date_check == 2 or date_check == 3 or date_check == 4:
        decision = True
    else:
        decision = False

    log.info('decision is: {}'.format(decision))
    return decision


local_tz = pendulum.timezone("America/New_York")
# DAG parameters

default_args = {
    'owner': 'Managed Services','depends_on_past': False,'start_date': datetime(2020,8,3,tzinfo=local_tz),'dagrun_timeout': None,'email': Variable.get('email'),'email_on_failure': True,'email_on_retry': False,'provide_context': True,'retries': 12,'retry_delay': timedelta(minutes=5)
}

with DAG(
    'execute_python',schedule_interval='0 6 * * *',default_args=default_args
    ) as dag:

    start_dummy = DummyOperator(
        task_id='start',dag=dag
    )

    end_dummy = DummyOperator(
        task_id='end',trigger_rule=TriggerRule.NONE_FAILED,dag=dag
    )

    weekdays_only = ShortCircuitOperator(
        task_id='weekdays_only',python_callable=checktheday,dag=dag
    )


    run_python = SSHOperator(
    ssh_conn_id="oci_connection",task_id='run_python',command='/usr/bin/python3  /home/sb/local/bin/runProcess.py -d {{ macros.ds_format(macros.ds_add(ds,1),"%Y-%m-%d","%Y%m%d") }}',dag=dag)


    start_dummy >> weekdays_only >> run_python >> end_dummy