动态生成的调度间隔?

问题描述

我创建了一个 DAG 来监控其他应用程序,当 DAG 没有完成监控任务时,它会发送警报电子邮件。我的DAG有一个十分钟的间隔调度器,也就是如果任务失败,它每十分钟发送一次电子邮件,但我想要实现的是十分钟给我发送第一封电子邮件,十五分钟第二封电子邮件,第三封三十分钟发邮件,只要前一个任务的条件失败就继续,如果任务成功继续每十分钟监控一次。


current_date = datetime.Now()
default_args = {
    'owner': 'my_dag','start_date':  datetime(2021,3,25,current_date.hour),'timeout': 300,--> seconds
    'poke_interval': 20,--> seconds
    'exponential_backoff': True
    }


with DAG(
    dag_id="Health_Check",schedule_interval="*/10 8-19 * * *",# At every 10th minutes past every hour from 8 through 20.
    tags=["Health_Check"],default_args=default_args,catchup=False
        ) as dag

    do_request = CustomSensor(
        task_id='do_request',endpoint='/Url',method='GET',http_conn_id='conn_api'
        )

我尝试在 DAG 级别实现指数回撤,因为在任务中我使用 exponential_backoff 传感器运算符的这个属性,以便当它尝试生成对 URL 的请求时,如果失败,它会第一次尝试如果失败,则在 20 秒后尝试,如果在 80 秒后失败,则在 40 秒后尝试,并以这种方式继续,直到超时。

我想实现相同的,但在 DAG 级别,有什么想法吗??

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)