使用DataProcPySparkOperator时无法配置GCP项目

问题描述

我正在使用Cloud Composer环境在GCP项目中运行工作流程。我的工作流程之一是使用DataprocClusterCreateOperator在不同的项目中创建一个Dataproc集群,然后尝试使用DataProcPySparkOperator模块中的airflow.contrib.operators.dataproc_operator将PySpark作业提交到该集群。

要创建集群,我可以指定一个project_id参数在另一个项目中创建它,但是似乎DataProcPySparkOperator会忽略此参数。例如,我希望能够传递project_id,但是当任务运行时,我最终会遇到404错误

from airflow.contrib.operators.dataproc_operator import DataProcpySparkOperator

t1 = DataProcpySparkOperator(
  project_id='my-gcp-project',main='...',arguments=[...],)

如何使用DataProcPySparkOperator提交另一个项目中的工作?

解决方法

DataProcPySparkOperator模块中的airflow.contrib.operators.dataproc_operator在其构造函数中不接受project_id kwarg,因此它将始终默认在Cloud Composer环境所在的项目中提交Dataproc作业。如果传递了参数,则将其忽略,这将导致在运行任务时出现404错误,因为操作员将尝试使用错误的群集路径来轮询作业。

一种解决方法是复制运算符和挂钩,然后对其进行修改以接受项目ID。但是,一个更简单的解决方案是使用airflow.providers软件包中较新的运算符(如果您使用的是支持它们的Airflow版本),因为在较新的Airflow版本中不推荐使用许多airflow.contrib运算符。

下面是一个示例。请注意,此模块中有一个更新的DataprocSubmitPySparkJobOperator,但不推荐使用DataprocSubmitJobOperator。因此,您应该使用后者,它接受项目ID。

from airflow.providers.google.cloud.operators.dataproc import DataprocSubmitJobOperator

t1 = DataprocSubmitJobOperator(
  project_id='my-gcp-project-id',location='us-central1',job={...},)

如果您使用Composer 1.10.5 +,Airflow 1.10.6+和Python 3运行环境,则将预先安装提供程序,并可以立即使用它们。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...