问题描述
我想使用Appscript触发Google Composer气流中断。有什么办法可以通过rest API或其他方式做到这一点。
如果可能,请提出解决方案。
解决方法
Airflow的端点可以通过其REST API触发DAG,但是无法直接访问它,因为Airflow Web服务器位于Cloud Composer architecture内,位于App Engine flexible下方环境。默认情况下,Airflow Web服务器与Identity-Aware Proxy (IAP)集成在一起,并且需要身份验证。
基于此,我在Cloud Composer文档中找到了一个示例,该示例指导您使用Cloud Functions触发DAG,尽管该代码位于JavaScript中,但我认为Google App无法执行该代码脚本。
另一方面,一种解决方法是遵循Triggering DAGs guide如下更改某些设置。
- 在创建函数时,不要将触发器类型设置为Cloud Storage,而是将其设置为HTTP,然后检查“允许未经身份验证的调用”以进行测试。将显示一个URL,目标是每次访问该URL时,都会执行DAG。
-
修改index.js文件的第一部分,因为将没有数据作为参数传递,也没有makeIapPostRequest函数作为API响应返回。
exports.triggerDag = async (req,res) => { // Modification // Fill in your Composer environment information here. // The project that holds your function const PROJECT_ID = 'your-project-id'; // Navigate to your webserver's login page and get this from the URL const CLIENT_ID = 'your-iap-client-id'; // This should be part of your webserver's URL: // {tenant-project-id}.appspot.com const WEBSERVER_ID = 'your-tenant-project-id'; // The name of the DAG you wish to trigger const DAG_NAME = 'composer_sample_trigger_response_dag'; // Other constants const WEBSERVER_URL = `https://${WEBSERVER_ID}.appspot.com/api/experimental/dags/${DAG_NAME}/dag_runs`; const USER_AGENT = 'gcf-event-trigger'; const BODY = {conf: ‘’}; // Modification // Make the request try { const iap = await authorizeIap(CLIENT_ID,PROJECT_ID,USER_AGENT); const apiReponse = await makeIapPostRequest(WEBSERVER_URL,BODY,iap.idToken,USER_AGENT); // Modification res.status(200).send('DAG_running!'); // Modification } catch (err) { console.error('Error authorizing IAP:',err.message); throw new Error(err); } }; const makeIapPostRequest = async (url,body,idToken,userAgent) => { const res = await fetch(url,{ method: 'POST',headers: { 'User-Agent': userAgent,Authorization: `Bearer ${idToken}`,},body: JSON.stringify(body),}); if (!res.ok) { const err = await res.text(); console.error('Error making IAP post request:',err.message); throw new Error(err); } return { apiRes: res.ok,// Modification }; };
-
这时,必须更改其他任何内容,因此在脚本文件中执行下一条指令以触发DAG。
function myFunction() { var response = UrlFetchApp.fetch("Cloud-function-URL"); Logger.log(response.getAllHeaders()); }
-
最后,在Airflow web interface中验证DAG是否已触发。