有没有办法将任务的返回值存储在Python变量中并与下游任务共享不使用xcom或airflow变量 第一点:第二点:控制器 DAG:

问题描述

我正在编写一个气流 dag,它将从数据库中读取一堆配置,然后使用 bash 运算符执行一系列 Python 脚本。之前读取的配置将作为参数传递。

问题是我没有获得与其他下游操作员共享配置的有效方法。我设计了下面的dag。以下是我的担忧。

  1. 我不确定会进行多少次数据库调用获取 jinja 模板中所需的值(在下面的示例中)。

  2. 除了每个任务中的配置都相同之外,我不确定每次从数据库获取它是否是个好主意。 这就是我不想也使用 xcom 的原因。我使用了 airflow 变量,因为 JSON 解析可以在一行中进行。但是,我猜数据库调用问题仍然存在。

class ReturningMysqLOperator(MysqLOperator):
    def execute(self,context):
        hook = MysqLHook(MysqL_conn_id=self.MysqL_conn_id,schema=self.database)
        s = hook.get_pandas_df(sql=self.sql,parameters=self.parameters)
        s = s.set_index('laptopName',drop=False)
        print(s)
        s = s.to_json(orient='index')
        Variable.set('jobconfig',s)



t1 = ReturningMysqLOperator(
    task_id='MysqL_query',sql='SELECT * FROM laptops',MysqL_conn_id='MysqL_db_temp',dag=dag)



t3 = BashOperator(
    task_id='sequence_one',bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)

t4 = BashOperator(
    task_id='sequence_two',bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)

t5 = BashOperator(
    task_id='sequence_three',bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)

t6 = BashOperator(
    task_id='sequence_four',bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)

t1 >> t3 
t3 >> [t4,t6]

解决方法

第一点:

我不确定将进行多少次数据库调用来获取 jinja 模板中所需的值(在下面的示例中)。

在您提供的示例中,您在每个 sequence_x 任务中建立了两个与元数据数据库的连接,每个 {{var.json.jobconfig.xx}} 调用一个。好消息是调度程序不会执行这些操作,因此不会在每个心跳间隔中执行。来自Astronomer guide

由于 DAG 文件中的所有顶级代码都会在每个调度程序中被解释 “心跳”、宏和模板允许运行时任务 卸载到执行程序而不是调度程序。

第二点:

我认为这里的关键方面是您要向下游传递的值始终相同,并且在您执行 T1 后不会更改。 这里可能有几种方法,但如果您想尽量减少对数据库的调用次数,并完全避免使用 XComs,则应使用 TriggerDagRunOperator

为此,您必须将 DAG 分成两部分,让 控制器 DAG 执行从 MySQL获取数据的任务>,触发第二个 DAG,您使用从控制器 DAG 获得的值执行所有 BashOperator。您可以使用 conf 参数传入数据。

以下是基于官方 Airflow example DAGs 的示例:

控制器 DAG:

from airflow import DAG
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def _data_from_mysql():
    # fetch data from the DB or anywhere else
    # set a Variable
    data = {'legion': {'company': 'some_company','laptop': 'great_laptop'}}
    Variable.set('jobconfig',data,serialize_json=True)


dag = DAG(
    dag_id="example_trigger_controller_dag",default_args={"owner": "airflow"},start_date=days_ago(2),schedule_interval="@once",tags=['example'],)

get_data_from_MySql = PythonOperator(
    task_id='get_data_from_MySql',python_callable=_data_from_mysql,)

trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",# Ensure this equals the dag_id of the DAG to trigger
    trigger_dag_id="example_trigger_target_dag",conf={"message": "Company is {{var.json.jobconfig.legion.company}}"},execution_date='{{ds}}',dag=dag,)
get_data_from_MySql >> trigger

trigger 任务被执行时,将包含密钥 message 作为第二个 DAGDAG 运行 配置的一部分。

目标 DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

dag = DAG(
    dag_id="example_trigger_target_dag",schedule_interval=None,)


def run_this_func(**context):
    """
    Print the payload "message" passed to the DagRun conf attribute.

    :param context: The execution context
    :type context: dict
    """
    print("Remotely received value of {} for key=message".format(
        context["dag_run"].conf["message"]))


run_this = PythonOperator(
    task_id="run_this",python_callable=run_this_func,dag=dag)

bash_task_1 = BashOperator(
    task_id="bash_task_1",bash_command='echo "Here is the message: $message"',env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},dag=dag
)

此示例中 bash_task_1 的日志将包括:

[2021-05-05 15:40:35,410] {bash.py:158} INFO - Running command: echo "Here is the message: $message"
[2021-05-05 15:40:35,418] {bash.py:169} INFO - Output:
[2021-05-05 15:40:35,419] {bash.py:173} INFO - Here is the message: Company is some_company
[2021-05-05 15:40:35,420] {bash.py:177} INFO - Command exited with return code 0

回顾:

  • 从数据库获取数据并将其设置为 Variable 的一项任务
  • 触发第二个 DAG 传递来自 Variable 中的 conf 的数据
  • 在您的目标 DAG 中使用来自 dag_run.conf
  • 的数据

这样,当第二个 DAG 被触发时,您只从 metadaba DB 读取一次。

此外,为了避免在 BashOperator 任务定义期间重复太多代码,您可以执行以下操作:

templated_bash_cmd = """
python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
"""

bash_task_1 = BashOperator(
    task_id="bash_task_1",bash_command=templated_bash_cmd,params={
        'path_to_script': 'path/sequence1.py'
    },dag=dag
)

如果这对您有用,请告诉我!