如何使用appscript触发Google Composer Airflow dag?

问题描述

我想使用Appscript触发Google Composer气流中断。有什么办法可以通过rest API或其他方式做到这一点。

如果可能,请提出解决方案。

解决方法

Airflow的端点可以通过其REST API触发DAG,但是无法直接访问它,因为Airflow Web服务器位于Cloud Composer architecture内,位于App Engine flexible下方环境。默认情况下,Airflow Web服务器与Identity-Aware Proxy (IAP)集成在一起,并且需要身份验证。

基于此,我在Cloud Composer文档中找到了一个示例,该示例指导您使用Cloud Functions触发DAG,尽管该代码位于JavaScript中,但我认为Google App无法执行该代码脚本。

另一方面,一种解决方法是遵循Triggering DAGs guide如下更改某些设置。

  1. 在创建函数时,不要将触发器类型设置为Cloud Storage,而是将其设置为HTTP,然后检查“允许未经身份验证的调用”以进行测试。将显示一个URL,目标是每次访问该URL时,都会执行DAG。

enter image description here

  1. 修改index.js文件的第一部分,因为将没有数据作为参数传递,也没有makeIapPostRequest函数作为API响应返回。

    exports.triggerDag = async (req,res) => { // Modification
       // Fill in your Composer environment information here.
       // The project that holds your function
       const PROJECT_ID = 'your-project-id';
       // Navigate to your webserver's login page and get this from the URL
       const CLIENT_ID = 'your-iap-client-id';
       // This should be part of your webserver's URL:
       // {tenant-project-id}.appspot.com
       const WEBSERVER_ID = 'your-tenant-project-id';
       // The name of the DAG you wish to trigger
       const DAG_NAME = 'composer_sample_trigger_response_dag';
    
       // Other constants
       const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`;
       const USER_AGENT = 'gcf-event-trigger';
       const BODY = {conf: ‘’}; // Modification
    
       // Make the request
       try {
          const iap = await authorizeIap(CLIENT_ID,PROJECT_ID,USER_AGENT);
    
          const apiReponse = await makeIapPostRequest(WEBSERVER_URL,BODY,iap.idToken,USER_AGENT);  // Modification
          res.status(200).send('DAG_running!');  // Modification
    
       } catch (err) {
          console.error('Error authorizing IAP:',err.message);
          throw new Error(err);
       }
    };
    
    
    const makeIapPostRequest = async (url,body,idToken,userAgent) => {
       const res = await fetch(url,{
          method: 'POST',headers: {
             'User-Agent': userAgent,Authorization: `Bearer ${idToken}`,},body: JSON.stringify(body),});
    
       if (!res.ok) {
          const err = await res.text();
          console.error('Error making IAP post request:',err.message);
          throw new Error(err);
       }
    
       return {
          apiRes: res.ok,// Modification 
       };
    };
    
  2. 这时,必须更改其他任何内容,因此在脚本文件中执行下一条指令以触发DAG。

    function myFunction() {
       var response = UrlFetchApp.fetch("Cloud-function-URL");
       Logger.log(response.getAllHeaders());
    }
    
  3. 最后,在Airflow web interface中验证DAG是否已触发。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...