如何在触发下一个 Dataflow 作业之前设置我的 Airflow DAG 以完成上一个 DataFlow 任务?

问题描述

我正在 Google Cloud Composer 中从 Airflow 调用 DataFlow 作业,

BufferGeometry

a、b 和 c 是调用数据流作业的任务。 我只想在数据流作业完成后运行 b,问题是它们都同时运行。

我怎样才能等到上一个工作完成?

解决方法

放置作业后,您需要放置一个传感器来验证作业是否完成。

示例:

   start_python_job_async = DataflowCreatePythonJobOperator(
        task_id="start-python-job-async",py_file=GCS_PYTHON,py_options=[],job_name='{{task.task_id}}',options={
            'output': GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.25.0'],py_interpreter='python3',py_system_site_packages=False,location='europe-west3',wait_until_finished=False,)

    wait_for_python_job_async_done = DataflowJobStatusSensor(
        task_id="wait-for-python-job-async-done",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",expected_statuses={DataflowJobStatus.JOB_STATE_DONE},)

    start_python_job_async >> wait_for_python_job_async_done

您可以查看进一步解释的 docsexamples