来自 cloud composer 的 Python 数据流作业卡在 Running

问题描述

下面的 Airflow DAG(Cloud Composer)被消息卡住了:

{base_task_runner.py:113} INFO - Job 5865: Subtask my_task 
{gcp_dataflow_hook.py:121} INFO - Running command /home/airflow/gcs/dags/dataflow/pyfile.py --runner DataflowRunner......"

我在 Dataflow 中没有看到提交的作业。知道这里缺少什么吗?

task1 = DataFlowPythonoperator(
task_id = 'my_task',py_file = '/home/airflow/gcs/dags/dataflow/pyfile.py',gcp_conn_id='google_cloud_default',options={
    "query" : 'SELECT * from `myproject.myds.mytable',"output" : 'gs://path/',"jobname" : 'my-job'
},dataflow_default_options={
    "project": 'my-project',"staging_location": 'gs://path/Staging/',"temp_location": 'gs://path/Temp/',},dag=dag
)

解决方法

执行以下操作:

  1. 如果您可以看到您在 Google Cloud Platform Dashboard 提交的作业,请检查 Dataflow 流列表。 enter image description here

  2. 尝试使用相同的命令 /home/airflow/gcs/dags/dataflow/pyfile.py 在本地运行 Python /home/airflow/gcs/dags/dataflow/pyfile.py --runner DataflowRunner...... 脚本。最有可能的拦截器就是这个脚本。

  3. 根据需要传递额外的参数,如 numWorkers,maxNumWorkers,region,worker_zone,etc
    enter image description here