Airflow:通过XCOM创建和传递表列表不以文件形式存储在驱动器上并设置正确的依赖项吗?

问题描述

这是我想要实现的预期流程和依赖项设置:

START ===>创建表列表(仅在触发DAG时一次)===>(通过XCOM)读取并传递表名===>为列表中的每个表动态创建单独的任务===>打印表名称===> END

这是示例代码流:

start = DummyOperator(
        task_id = 'start',dag=dag
)

end = DummyOperator(
        task_id = 'end',dag=dag
)

#create table list:
def create_source_table_list(dsn,uid,pwd,exclude_table_list,**kwargs):
    try:
        cnxn = pyodbc.connect('DSN={};UID={};PWD={}'.format(dsn,pwd))
        cursor=cnxn.cursor()
        tables_list = []
        for row in cursor.tables():
            tables_list.append(row.table_name)
        final_list = [ele for ele in tables_list if ele not in exclude_table_list]
    return final_list

create_table_list = Pythonoperator(
                task_id = 'create_table_list',python_callable=create_source_table_list,provide_context=True,op_args=['DSNNAME','USERID','PASSWORD',['TABLE1','TABLE2']],dag=dag
)

#function for dynamic task generation 
def createDynamicTask(task_id,callableFunction,args):
    task = Pythonoperator(
        task_id = task_id,python_callable = eval(callableFunction),op_kwargs = args,xcom_push = True,email= ['xyz.com'],email_on_failure = True,email_on_retry = False,dag = dag
    )
    return task

#function to print table names
def print_tables(table_name,**kwargs):
    ti = kwargs['ti']
    ls = ti.xcom_pull(task_ids='create_table_list')
    print("The table name is: ",table_name)

for table in create_source_table_list(['DSNNAME','TABLE2']]):

    print_table_names=createDynamicETL('{}-dynamic_task'.format(table),'print_tables',{'table_name':str(table)})

## set dependency
start >> create_table_list
create_table_list >> [print_table_names]
print_table_names >> end

但是,在上述实现中,我面临以下问题/挑战/问题:

  1. 表名不会存储在XCOM中,并且希望避免将表列表存储在驱动器上的文件中。
  2. 数据库中每个表都需要多个数据库调用
  3. 错误的任务依赖项设置-它仅采用并显示列表中最后一个表的依赖项,如下所示: 开始==> create_table_list ==> {列表中的LAST_TABLE_NAME} ==>结束

请提出我在做什么错

谢谢!

解决方法

由于列表不是计算上昂贵的 ,因此建议您确定DAG之外的表列表。然后,您可以动态生成所需的任务。

这有点hacky,但是如果您真的希望将这些表列在Airflow运算符中,则可以考虑执行以下操作:

  • 使用任务来创建单独的(计划外的)DAG,该任务将更新表列表并将其存储在Airflow Variable中。
  • 使用TriggerDagRunOperator触发动态DAG,然后该动态DAG包含基于该Variable内容的任务。

P.S。我猜您正在使用MS SQL。您可以使用MsSqlOperator来查询数据库。