问题描述
下面的 Airflow DAG(Cloud Composer)被消息卡住了:
{base_task_runner.py:113} INFO - Job 5865: Subtask my_task
{gcp_dataflow_hook.py:121} INFO - Running command /home/airflow/gcs/dags/dataflow/pyfile.py --runner DataflowRunner......"
我在 Dataflow 中没有看到提交的作业。知道这里缺少什么吗?
task1 = DataFlowPythonoperator(
task_id = 'my_task',py_file = '/home/airflow/gcs/dags/dataflow/pyfile.py',gcp_conn_id='google_cloud_default',options={
"query" : 'SELECT * from `myproject.myds.mytable',"output" : 'gs://path/',"jobname" : 'my-job'
},dataflow_default_options={
"project": 'my-project',"staging_location": 'gs://path/Staging/',"temp_location": 'gs://path/Temp/',},dag=dag
)