问题描述
我正在编写一个气流 dag,它将从数据库中读取一堆配置,然后使用 bash 运算符执行一系列 Python 脚本。之前读取的配置将作为参数传递。
问题是我没有获得与其他下游操作员共享配置的有效方法。我设计了下面的dag。以下是我的担忧。
-
除了每个任务中的配置都相同之外,我不确定每次从数据库中获取它是否是个好主意。 这就是我不想也使用 xcom 的原因。我使用了 airflow 变量,因为 JSON 解析可以在一行中进行。但是,我猜数据库调用问题仍然存在。
class ReturningMysqLOperator(MysqLOperator):
def execute(self,context):
hook = MysqLHook(MysqL_conn_id=self.MysqL_conn_id,schema=self.database)
s = hook.get_pandas_df(sql=self.sql,parameters=self.parameters)
s = s.set_index('laptopName',drop=False)
print(s)
s = s.to_json(orient='index')
Variable.set('jobconfig',s)
t1 = ReturningMysqLOperator(
task_id='MysqL_query',sql='SELECT * FROM laptops',MysqL_conn_id='MysqL_db_temp',dag=dag)
t3 = BashOperator(
task_id='sequence_one',bash_command='python3 path/sequence1.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)
t4 = BashOperator(
task_id='sequence_two',bash_command='python3 path/sequence2.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)
t5 = BashOperator(
task_id='sequence_three',bash_command='python3 path/sequence3.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)
t6 = BashOperator(
task_id='sequence_four',bash_command='python3 path/sequence4.py {{var.json.jobconfig.Legion.laptopName}} {{var.json.jobconfig.Legion.company}}',dag=dag)
t1 >> t3
t3 >> [t4,t6]
解决方法
第一点:
我不确定将进行多少次数据库调用来获取 jinja 模板中所需的值(在下面的示例中)。
在您提供的示例中,您在每个 sequence_x
任务中建立了两个与元数据数据库的连接,每个 {{var.json.jobconfig.xx}}
调用一个。好消息是调度程序不会执行这些操作,因此不会在每个心跳间隔中执行。来自Astronomer guide:
由于 DAG 文件中的所有顶级代码都会在每个调度程序中被解释 “心跳”、宏和模板允许运行时任务 卸载到执行程序而不是调度程序。
第二点:
我认为这里的关键方面是您要向下游传递的值始终相同,并且在您执行 T1
后不会更改。
这里可能有几种方法,但如果您想尽量减少对数据库的调用次数,并完全避免使用 XComs
,则应使用 TriggerDagRunOperator
。
为此,您必须将 DAG 分成两部分,让 控制器 DAG 执行从 MySQL获取数据的任务>,触发第二个 DAG,您使用从控制器 DAG 获得的值执行所有 BashOperator
。您可以使用 conf
参数传入数据。
以下是基于官方 Airflow example DAGs 的示例:
控制器 DAG:
from airflow import DAG
from airflow.models import Variable
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def _data_from_mysql():
# fetch data from the DB or anywhere else
# set a Variable
data = {'legion': {'company': 'some_company','laptop': 'great_laptop'}}
Variable.set('jobconfig',data,serialize_json=True)
dag = DAG(
dag_id="example_trigger_controller_dag",default_args={"owner": "airflow"},start_date=days_ago(2),schedule_interval="@once",tags=['example'],)
get_data_from_MySql = PythonOperator(
task_id='get_data_from_MySql',python_callable=_data_from_mysql,)
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",# Ensure this equals the dag_id of the DAG to trigger
trigger_dag_id="example_trigger_target_dag",conf={"message": "Company is {{var.json.jobconfig.legion.company}}"},execution_date='{{ds}}',dag=dag,)
get_data_from_MySql >> trigger
当 trigger
任务被执行时,将包含密钥 message
作为第二个 DAG 的 DAG 运行 配置的一部分。
目标 DAG:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
dag = DAG(
dag_id="example_trigger_target_dag",schedule_interval=None,)
def run_this_func(**context):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param context: The execution context
:type context: dict
"""
print("Remotely received value of {} for key=message".format(
context["dag_run"].conf["message"]))
run_this = PythonOperator(
task_id="run_this",python_callable=run_this_func,dag=dag)
bash_task_1 = BashOperator(
task_id="bash_task_1",bash_command='echo "Here is the message: $message"',env={'message': '{{ dag_run.conf["message"] if dag_run else "" }}'},dag=dag
)
此示例中 bash_task_1
的日志将包括:
[2021-05-05 15:40:35,410] {bash.py:158} INFO - Running command: echo "Here is the message: $message"
[2021-05-05 15:40:35,418] {bash.py:169} INFO - Output:
[2021-05-05 15:40:35,419] {bash.py:173} INFO - Here is the message: Company is some_company
[2021-05-05 15:40:35,420] {bash.py:177} INFO - Command exited with return code 0
回顾:
- 从数据库获取数据并将其设置为
Variable
的一项任务 - 触发第二个 DAG 传递来自
Variable
中的conf
的数据 - 在您的目标 DAG 中使用来自
dag_run.conf
的数据
这样,当第二个 DAG 被触发时,您只从 metadaba DB 读取一次。
此外,为了避免在 BashOperator
任务定义期间重复太多代码,您可以执行以下操作:
templated_bash_cmd = """
python3 {{params.path_to_script}} {{dag_run.conf["laptopName"]}} {{dag_run.conf["company"]}}
"""
bash_task_1 = BashOperator(
task_id="bash_task_1",bash_command=templated_bash_cmd,params={
'path_to_script': 'path/sequence1.py'
},dag=dag
)
如果这对您有用,请告诉我!