问题描述
我正在 Google Cloud Composer 中从 Airflow 调用 DataFlow 作业,
BufferGeometry
a、b 和 c 是调用数据流作业的任务。 我只想在数据流作业完成后运行 b,问题是它们都同时运行。
我怎样才能等到上一个工作完成?
解决方法
放置作业后,您需要放置一个传感器来验证作业是否完成。
示例:
start_python_job_async = DataflowCreatePythonJobOperator(
task_id="start-python-job-async",py_file=GCS_PYTHON,py_options=[],job_name='{{task.task_id}}',options={
'output': GCS_OUTPUT,},py_requirements=['apache-beam[gcp]==2.25.0'],py_interpreter='python3',py_system_site_packages=False,location='europe-west3',wait_until_finished=False,)
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait-for-python-job-async-done",job_id="{{task_instance.xcom_pull('start-python-job-async')['job_id']}}",expected_statuses={DataflowJobStatus.JOB_STATE_DONE},)
start_python_job_async >> wait_for_python_job_async_done