问题描述
我正在使用Cloud Composer环境在GCP项目中运行工作流程。我的工作流程之一是使用DataprocClusterCreateOperator
在不同的项目中创建一个Dataproc集群,然后尝试使用DataProcPySparkOperator
模块中的airflow.contrib.operators.dataproc_operator
将PySpark作业提交到该集群。
要创建集群,我可以指定一个project_id
参数在另一个项目中创建它,但是似乎DataProcPySparkOperator
会忽略此参数。例如,我希望能够传递project_id
,但是当任务运行时,我最终会遇到404
错误:
from airflow.contrib.operators.dataproc_operator import DataProcpySparkOperator
t1 = DataProcpySparkOperator(
project_id='my-gcp-project',main='...',arguments=[...],)
如何使用DataProcPySparkOperator
提交另一个项目中的工作?
解决方法
DataProcPySparkOperator
模块中的airflow.contrib.operators.dataproc_operator
在其构造函数中不接受project_id
kwarg,因此它将始终默认在Cloud Composer环境所在的项目中提交Dataproc作业。如果传递了参数,则将其忽略,这将导致在运行任务时出现404错误,因为操作员将尝试使用错误的群集路径来轮询作业。
一种解决方法是复制运算符和挂钩,然后对其进行修改以接受项目ID。但是,一个更简单的解决方案是使用airflow.providers
软件包中较新的运算符(如果您使用的是支持它们的Airflow版本),因为在较新的Airflow版本中不推荐使用许多airflow.contrib
运算符。
下面是一个示例。请注意,此模块中有一个更新的DataprocSubmitPySparkJobOperator
,但不推荐使用DataprocSubmitJobOperator
。因此,您应该使用后者,它接受项目ID。
from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator
t1 = DataprocSubmitJobOperator(
project_id='my-gcp-project-id',location='us-central1',job={...},)
如果您使用Composer 1.10.5 +,Airflow 1.10.6+和Python 3运行环境,则将预先安装提供程序,并可以立即使用它们。