气流-重用任务

问题描述

我将某些表从Postgresql导出到GCS。为了使它看起来简单,我创建了如下所示的dag。

enter image description here

出口达人是这个。

from airflow.models import DAG
from airflow.contrib.operators.postgres_to_gcs_operator import  PostgresToGoogleCloudStorageOperator

def sub_dag_export(parent_dag_name,child_dag_name,args,export_suffix):
    dag = DAG(
      '%s.%s' % (parent_dag_name,child_dag_name),default_args=args,start_date=args['start_date'],max_active_runs=1,)
    export_tbl1 = PostgresToGoogleCloudStorageOperator(
        task_id='export_tbl1',postgres_conn_id='cloudsqlpg',google_cloud_storage_conn_id='gcsconn',sql='SELECT * FROM tbl1',export_format='csv',field_delimiter='|',bucket='dsrestoretest',filename='file/export_tbl1/tbl1_{}.csv',schema_filename='file/schema/tbl1.json',dag=dag)

        export_tbl1 = PostgresToGoogleCloudStorageOperator(
        task_id='export_tbl2',sql='SELECT * FROM tbl2',filename='file/export_tbl1/tbl2_{}.csv',schema_filename='file/schema/tbl2.json',dag=dag)

任务1和2都执行相同的工作,因此我想对所有表重用export1任务。但这不应改变流程。 (Start --> export table1 --> table2 -->table3 --end),因为由于某些原因,如果任务失败,则需要从失败的位置重新运行任务。因此,即使我要使用单个任务,DAG图也应该相同。

我看到了一种方法from this link),但仍然无法完全理解。

解决方法

只需将通用代码提取到函数中,然后让它为您创建运算符实例即可。

def pg_table_to_gcs(table_name: str) -> PostgresToGoogleCloudStorageOperator:
  return PostgresToGoogleCloudStorageOperator(
      task_id=f"export_{table_name}",postgres_conn_id="cloudsqlpg",google_cloud_storage_conn_id="gcsconn",sql=f"SELECT * FROM {table_name}",export_format="csv",field_delimiter="|",bucket="dsrestoretest",filename=f"file/export_{table_name}/{table_name}.csv",schema_filename=f"file/schema/{table_name}.json",dag=dag)

tables = ["table0","table1","table2"]

with DAG(dag_id="kube_example",default_args=default_args) as dag:
    reduce(lambda t0,t1: t0 >> t1,[pg_table_to_gcs(table,dag) for table in table_names])

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...