问题描述
我正在创建一个传递任意数量的运算符和参数并创建DAG的函数,我在渲染jinja时遇到问题,我在相关的运算符中更新了template_fields
,但是没有人遇到过运用这个吗?
def date_offset(active_day_interval,dag,opr_array=[]
):
with dag:
for i in range(active_day_interval):
run_check = CheckOperator(
task_id=f'{dag.dag_id}_run_check_{i}',day_offset=i,dag=dag
)
params = {
"date": "{{ ds }}"
}
task_list = []
for opr in opr_array:
new_id = opr['class_args']['task_id'] + f"_{i}"
op_params = opr['class_args'].copy()
op_params.update({
"task_id": new_id,"day_offset": i,"dag": dag,"xcom_loc": new_id,"params": params
})
operator = opr['class_name']
task_operator = operator(**op_params)
task_list.append(task_operator)
chain(run_check,*task_list)
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)