问题描述
我想将气流变量传递给这样的SQL查询模板文件(在sql / test.sql文件中):
select 'test','{{ params.test_ds }}','{{ test_dt }}' from test_table;
我创建了一个继承自PostgresOperator的Operator:
class EtlOperator(PostgresOperator):
template_fields = ('sql','test_dt','params')
template_ext = PostgresOperator.template_ext
@apply_defaults
def __init__(self,test_dt,params,*args,**kwargs):
super(EtlRunIdOperator,self).__init__(*args,**kwargs)
self.test_dt = test_dt
self.params = params
def execute(self,context):
super(EtlRunIdOperator,self).execute(context)
我创建了此任务:
test_task00 = EtlOperator(
task_id=f'test_task00',postgres_conn_id='redshift',sql='sql/test.sql',params={
'test_ds': '{{ ds }}'
},database='default',test_dt='{{ execution_date }}',provide_context=True,# tried without it too
dag=dag
)
但是,无论template_fields的参数或test_dt是什么,sql仍不会解析变量,结果如下:
INFO - Executing: select 'test','{{ ds }}','' from test_table;
我的配置哪里出问题了?
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)