AIRFLOW-JINJA未在DAG中渲染

问题描述

我正在创建一个传递任意数量的运算符和参数并创建DAG的函数,我在渲染jinja时遇到问题,我在相关的运算符中更新了template_fields,但是没有人遇到过运用这个吗?

def date_offset(active_day_interval,dag,opr_array=[]
                   ):

    with dag:
        for i in range(active_day_interval):
            run_check = CheckOperator(
                task_id=f'{dag.dag_id}_run_check_{i}',day_offset=i,dag=dag
            )

            params = {
                "date": "{{ ds }}"
            }

            task_list = []
            for opr in opr_array:

                new_id = opr['class_args']['task_id'] + f"_{i}"
                op_params = opr['class_args'].copy()
                op_params.update({
                    "task_id": new_id,"day_offset": i,"dag": dag,"xcom_loc": new_id,"params": params
                })
                operator = opr['class_name']
                task_operator = operator(**op_params)
                task_list.append(task_operator)

            chain(run_check,*task_list)

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)