问题描述
我正在尝试根据 dag_run conf 输入创建多个气流任务。 conf 将有一个值数组,每个值都需要生成一个任务。任务反过来需要将值传递给它的可调用函数。像这样:
#create this task in a loop
task = Pythonoperator(task_id="fetch_data",python_callable=fetch_data(value from array),retries=10)
Conf 的值如下:
{"fruits":["apple","kiwi","orange"]}
我认为可以通过以下方式访问:
kwargs['dag_run'].conf('fruits')
如何在运算符外访问此值,然后在循环中创建运算符?
解决方法
您可以将 PythonOperator 实例化包装在使用值列表的 for 循环中。
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
dag = DAG(
dag_id='fruit_name_printer',start_date=datetime(2021,1,1),schedule_interval='@once'
)
input = [
'apple','orange','banana'
]
def call_func(fruit_name):
print(fruit_name)
with dag:
for fruit in input:
printer = PythonOperator(
task_id=f'print_{fruit}',python_callable=call_func,op_kwargs={
'fruit_name': fruit
}
)