xcom值在if / else循环中被else语句覆盖

问题描述

我正在遍历包含一些sql文件文件夹。对于每个文件,我都希望将其作为xcom值推送,并为每个查询提供特定的值。

下面的代码可以正常工作,但是添加else语句时不起作用。 not set的价值正在覆盖一切。

directory = r'airflow_home/dags/sql'
for filename in os.listdir(directory):
    with open(os.path.join(directory,filename),'r') as file:
        sqlFile = file.read()
        file.close()
        if filename == 'api_params.sql':
            query = sqlFile.format(partitioned_key,execution_date_second,partitioned_key,next_execution_date_second)
        if filename == 'create_fact_table.sql':
            query = sqlFile.format(fact_table_dest)
        if filename == 'create_geo_table.sql':
            query = sqlFile.format(fact_table_dest)
        if filename == f'{geo_type}'+'.sql':
            query = sqlFile.format(execution_date)
            filename = 'geo_query'
        if filename == 'schema_' + f'{schema}' + '.sql':
            query = sqlFile.format(fact_table_dest,raw_table_dest,execution_date,next_execution_date)
            filename = 'production_query'
        if filename == 'insert_key.sql':
            query = sqlFile.format(raw_table_dest,next_execution_date)
        else:
            query = 'not set'
        task_instance.xcom_push(key=filename,value=query)

有人可以解释一下这里发生了什么吗?

解决方法

您正在使用多个if语句,这些语句一个接一个地执行。 else仅引用最后的if语句,因此将覆盖先前设置的query参数。您真正要寻找的是elif-请参见Python Docs

directory = r'airflow_home/dags/sql'
for filename in os.listdir(directory):
    with open(os.path.join(directory,filename),'r') as file:
        sqlFile = file.read()
        file.close()
        if filename == 'api_params.sql':
            query = sqlFile.format(partitioned_key,execution_date_second,partitioned_key,next_execution_date_second)
        elif filename == 'create_fact_table.sql':
            query = sqlFile.format(fact_table_dest)
        elif filename == 'create_geo_table.sql':
            query = sqlFile.format(fact_table_dest)
        elif filename == f'{geo_type}'+'.sql':
            query = sqlFile.format(execution_date)
            filename = 'geo_query'
        elif filename == 'schema_' + f'{schema}' + '.sql':
            query = sqlFile.format(fact_table_dest,raw_table_dest,execution_date,next_execution_date)
            filename = 'production_query'
        elif filename == 'insert_key.sql':
            query = sqlFile.format(raw_table_dest,next_execution_date)
        else:
            query = 'not set'
        task_instance.xcom_push(key=filename,value=query)