问题描述
我正在遍历包含一些sql文件的文件夹。对于每个文件,我都希望将其作为xcom值推送,并为每个查询提供特定的值。
下面的代码可以正常工作,但是添加else语句时不起作用。 not set
的价值正在覆盖一切。
directory = r'airflow_home/dags/sql'
for filename in os.listdir(directory):
with open(os.path.join(directory,filename),'r') as file:
sqlFile = file.read()
file.close()
if filename == 'api_params.sql':
query = sqlFile.format(partitioned_key,execution_date_second,partitioned_key,next_execution_date_second)
if filename == 'create_fact_table.sql':
query = sqlFile.format(fact_table_dest)
if filename == 'create_geo_table.sql':
query = sqlFile.format(fact_table_dest)
if filename == f'{geo_type}'+'.sql':
query = sqlFile.format(execution_date)
filename = 'geo_query'
if filename == 'schema_' + f'{schema}' + '.sql':
query = sqlFile.format(fact_table_dest,raw_table_dest,execution_date,next_execution_date)
filename = 'production_query'
if filename == 'insert_key.sql':
query = sqlFile.format(raw_table_dest,next_execution_date)
else:
query = 'not set'
task_instance.xcom_push(key=filename,value=query)
有人可以解释一下这里发生了什么吗?
解决方法
您正在使用多个if
语句,这些语句一个接一个地执行。 else
仅引用最后的if语句,因此将覆盖先前设置的query
参数。您真正要寻找的是elif
-请参见Python Docs。
directory = r'airflow_home/dags/sql'
for filename in os.listdir(directory):
with open(os.path.join(directory,filename),'r') as file:
sqlFile = file.read()
file.close()
if filename == 'api_params.sql':
query = sqlFile.format(partitioned_key,execution_date_second,partitioned_key,next_execution_date_second)
elif filename == 'create_fact_table.sql':
query = sqlFile.format(fact_table_dest)
elif filename == 'create_geo_table.sql':
query = sqlFile.format(fact_table_dest)
elif filename == f'{geo_type}'+'.sql':
query = sqlFile.format(execution_date)
filename = 'geo_query'
elif filename == 'schema_' + f'{schema}' + '.sql':
query = sqlFile.format(fact_table_dest,raw_table_dest,execution_date,next_execution_date)
filename = 'production_query'
elif filename == 'insert_key.sql':
query = sqlFile.format(raw_table_dest,next_execution_date)
else:
query = 'not set'
task_instance.xcom_push(key=filename,value=query)