特定批次的气流任务

问题描述

我想运行一组这样的任务:

a >> [b,c,d] >> [e,f,g] >> [h,i,j,k,l,m]

首先运行任务 a,完成后并行运行 b,d,然后当 b,d 中的最后一个完成。开始并行运行 e,g 等

但是我收到一个错误不支持的操作数类型为 >>: 'list' 和 'list'

我想要做什么的正确语法是什么?

解决方法

您得到的错误与不支持使用按位运算符的列表之间的依赖关系有关,[task_a,task_b] >> [task_c,task_d] 将不起作用。

恕我直言,实现您正在寻找的东西(还有其他方法)的最简单、更简洁的方法是使用 TaskGroup 并在它们之间设置依赖关系,如下所示:

图表视图:

Graph view

from time import sleep
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.task_group import TaskGroup

default_args = {
    'start_date':  days_ago(1)
}


def _execute_task(**kwargs):
    print(f"Task_id: {kwargs['ti'].task_id}")
    sleep(10)


def _create_python_task(name):
    return PythonOperator(
        task_id=f'task_{name}',python_callable=_execute_task)


with DAG('parallel_tasks_example',schedule_interval='@once',default_args=default_args,catchup=False) as dag:

    task_a = DummyOperator(task_id='task_a')

    with TaskGroup('first_group') as first_group:

        for name in list('bcd'):
            task = _create_python_task(name)

    with TaskGroup('second_group') as second_group:

        for name in list('efg'):
            task = _create_python_task(name)

    with TaskGroup('third_group') as third_group:

        for name in list('hijk'):
            task = _create_python_task(name)

task_a >> first_group >> second_group >> third_group

来自 TaskGroup 类定义:

任务集合。当 set_downstream() 或 set_upstream() 被调用时 任务组,如有必要,它将应用于组内的所有任务。

您可以找到有关 here 的官方示例。