问题描述
我想运行一组这样的任务:
a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]
首先运行任务 a,完成后并行运行 b,d,然后当 b,d 中的最后一个完成。开始并行运行 e,g 等
但是我收到一个错误,不支持的操作数类型为 >>: 'list' 和 'list'
我想要做什么的正确语法是什么?
解决方法
您得到的错误与不支持使用按位运算符的列表之间的依赖关系有关,[task_a,task_b] >> [task_c,task_d]
将不起作用。
恕我直言,实现您正在寻找的东西(还有其他方法)的最简单、更简洁的方法是使用 TaskGroup
并在它们之间设置依赖关系,如下所示:
图表视图:
from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup
default_args = {
'start_date': days_ago(1)
}
def _execute_task(**kwargs):
print(f"Task_id: {kwargs['ti'].task_id}")
sleep(10)
def _create_python_task(name):
return PythonOperator(
task_id=f'task_{name}',python_callable=_execute_task)
with DAG('parallel_tasks_example',schedule_interval='@once',default_args=default_args,catchup=False) as dag:
task_a = DummyOperator(task_id='task_a')
with TaskGroup('first_group') as first_group:
for name in list('bcd'):
task = _create_python_task(name)
with TaskGroup('second_group') as second_group:
for name in list('efg'):
task = _create_python_task(name)
with TaskGroup('third_group') as third_group:
for name in list('hijk'):
task = _create_python_task(name)
task_a >> first_group >> second_group >> third_group
来自 TaskGroup
类定义:
任务集合。当 set_downstream() 或 set_upstream() 被调用时 任务组,如有必要,它将应用于组内的所有任务。
您可以找到有关 here 的官方示例。