异步 Lambda 函数:返回承诺或发送 responseURL 不会终止 CloudFormation 自定义资源调用

问题描述

我有一个通过 CloudFormation 模板作为自定义资源调用的 lambda 函数。它创建/删除 AWS Connect 实例。 API 调用工作正常,但我似乎无法终止自定义资源调用,因此最后一个 CF 块仍然是 CREATE_IN_PROGRESS。无论我从异步函数返回什么,它都不会成功终止 CF 执行。

我能够像 https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html 一样成功使用非异步处理程序,但我需要进行多个 API 调用并等待完成,因此需要异步处理程序。

下面是最简单形式的代码,尽管我已经尝试了几乎所有的方法包括使用回调和上下文(即exports.handler = async function(event,context,callback) {...}),两者都是对于异步处理程序,这应该是不必要的。我试过使用 cfn-response 直接发送一个似乎被异步处理程序忽略的响应。我试过直接返回带有和不带有等待的承诺,尝试返回包含各种 responseStatus 和 responseData 的变量,但似乎没有任何效果

Transform: 'AWS::Serverless-2016-10-31'
Parameters:
  IdentityManagementType:
    Description: The type of identity management for your Amazon Connect users.
    Type: String
    AllowedValues: ["SAML","CONNECT_MANAGED","EXISTING_DIRECTORY"]
    Default: "SAML"
  InboundCallsEnabled:
    Description: Whether your contact center handles incoming contacts.
    Type: String
    AllowedValues: [true,false]
    Default: true
  InstanceAlias:
    Description: The name for your instance.
    Type: String
    MaxLength: 62
  OutboundCallsEnabled:
    Description: Whether your contact center allows outbound calls.
    Type: String
    AllowedValues: [true,false]
    Default: true
  DirectoryId:
    Description: Optional. The identifier for the directory,if using this type of Identity Management.
    Type: String
  ClientToken:
    Description: Optional. The idempotency token. Used for concurrent deployments
    Type: String
    MaxLength: 500
  Region:
    Description: Region to place the AWS Connect Instance
    Type: String
    Default: us-east-1
#Handler for optional values
Conditions:
  HasClientToken: !Not
    - !Equals
      - ""
      - !Ref ClientToken
  HasDirectoryId: !Not
    - !Equals
      - ""
      - !Ref DirectoryId

Resources:
  CreateConnectInstance:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: !Sub "${AWS::StackName}-AWSConnectInstance"
      Handler: index.handler
      Runtime: nodejs12.x
      Description: Invoke a function to create an AWS Connect instance.
      MemorySize: 128
      Timeout: 30
      Role: !GetAtt LambdaExecutionRole.Arn
      Layers:
        - !Sub "arn:aws:lambda:us-east-1:${AWS::AccountId}:layer:node_sdk:1"
      Environment:
        Variables:
          IdentityManagementType:
            Ref: IdentityManagementType
          InboundCallsEnabled:
            Ref: InboundCallsEnabled
          InstanceAlias:
            Ref: InstanceAlias
          OutboundCallsEnabled:
            Ref: OutboundCallsEnabled
          Region:
            Ref: Region
          #Optional Values
          ClientToken: !If
            - HasClientToken
            - !Ref ClientToken
            - !Ref "AWS::Novalue"
          DirectoryId: !If
            - HasClientToken
            - !Ref ClientToken
            - !Ref "AWS::Novalue"
      InlineCode: |
        var aws = require("aws-sdk");
        exports.handler = async function(event) {
            console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
            var connect = new aws.Connect({region: event.ResourceProperties.Region});
            var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
            var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
            var createInstanceParams = {
                InboundCallsEnabled: isInboundCallsEnabled,OutboundCallsEnabled: isOutboundCallsEnabled,IdentityManagementType: process.env.IdentityManagementType,ClientToken: process.env.ClientToken,DirectoryId: process.env.DirectoryId,InstanceAlias: process.env.InstanceAlias
            };

            // Create AWS Connect instance using specified parameters
            if (event.RequestType == "Create") {
                return await connect.createInstance(createInstanceParams).promise();
                // I can store this in a variable and read the contents fine,but...
                // returning the promise does not terminate execution
            }
        };


  InvokeCreateConnectInstance:
    Type: Custom::CreateConnectInstance
    Properties:
      Servicetoken: !GetAtt CreateConnectInstance.Arn
      Region: !Ref "AWS::Region"

https://docs.aws.amazon.com/lambda/latest/dg/nodejs-handler.html 处的文档明确指出您应该能够直接从任何异步函数返回 await apiCall.promise(),这正是我想要做的,例如

const s3 = new AWS.S3()

exports.handler = async function(event) {
  return s3.listBuckets().promise()
}

为什么我不能从异步函数返回? API 调用再次正常工作,Connect 实例被创建和删除(尽管我为了简洁起见省略了删除代码),但 CF 只是挂了几个小时,直到最终说“自定义资源未能在预期时间内稳定”

为了可读性,这里是内联代码本身:

        exports.handler = async function(event) {
            console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
            var connect = new aws.Connect({region: event.ResourceProperties.Region});
            var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
            var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
            var createInstanceParams = {
                InboundCallsEnabled: isInboundCallsEnabled,but...
                // returning the promise does not terminate CF execution
            }
          };

更新:我已经完全按照 AMI 查找示例(第一个链接)中所示实现了 sendResponse 方法,并且正在发送完全正确的响应结构,它甚至在数据字段中包含新创建的连接实例 ID:

{
    "Status": "SUCCESS","Reason": "See the details in CloudWatch Log Stream: 2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee","PhysicalResourceId": "2020/12/23/[$LATEST]6fef3553870b4fba90479a37b4360cee","StackId": "arn:aws:cloudformation:us-east-1:642608065726:stack/cr12/1105a290-4534-11eb-a6de-0a8534d05dcd","RequestId": "2f7c3d9e-941f-402c-b739-d2d965288cfe","LogicalResourceId": "InvokeCreateConnectInstance","Data": {
        "InstanceId": "2ca7aa49-9b20-4feb-8073-5f23d63e4cbc"
    }
}

并且仍然不会在 CloudFormation 中关闭自定义资源。我只是不明白为什么当我将上述内容返回到 event.responseURL 时会发生这种情况。这就像指定一个异步处理程序完全破坏了自定义资源处理程序并阻止它关闭

更新:当我手动将上述响应直接卷曲到 event.responseUrl 时,CF 资源注册成功! WTF...我发送的响应与 lambda 函数发送的响应完全相同,它从 CURL 接受它,但不从我的 lambda 函数接受它。

UPDATE:最新代码包括sendResponse等

var aws = require("aws-sdk");
exports.handler = async function(event,callback) {
    console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
    var connect = new aws.Connect({region: event.ResourceProperties.Region});
    var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
    var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
    var createInstanceParams = {
        InboundCallsEnabled: isInboundCallsEnabled,InstanceAlias: process.env.InstanceAlias
    };
    var responseStatus;
    var responseData = {};

    // Create Connect instance
    if (event.RequestType == "Create") {
        try {
            var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
            responseStatus = "SUCCESS";
            responseData = {"InstanceId": createInstanceRequest.Id};
        } catch (err) {
            responseStatus = "Failed";
            responseData = {Error: "CreateInstance Failed"};
            console.log(responseData.Error + ":\n",err);
        }
        sendResponse(event,responseStatus,responseData);
        return;
    }

    // Look up the ID and call deleteInstance.
    if (event.RequestType == "Delete") {
        var instanceId;
        var listInstanceRequest = await connect.listInstances({}).promise();
        listInstanceRequest.InstanceSummaryList.forEach(instance => {
            if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
                instanceId = instance.Id;
            }
        });
        if (instanceId !== undefined) {
            try {
                var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
                responseStatus = "SUCCESS";
                responseData = {"InstanceId": instanceId};
            } catch (err) {
                responseStatus = "Failed";
                responseData = {Error: "DeleteInstance call Failed"};
                console.log(responseData.Error + ":\n",err);
            }
        } else {
            responseStatus = "Failed";
            responseData = {Error: "DeleteInstance Failed; no match found"};
            console.log(responseData.Error);
        }
        sendResponse(event,responseData);
        return;
    }
};

// Send response to the pre-signed S3 URL 
function sendResponse(event,responseData) {
    var responseBody = JSON.stringify({
        Status: responseStatus,Reason: "CloudWatch Log Stream: " + context.logStreamName,PhysicalResourceId: context.logStreamName,StackId: event.StackId,RequestId: event.RequestId,LogicalResourceId: event.LogicalResourceId,Data: responseData
    });
    console.log("RESPONSE BODY:\n",responseBody);
    var https = require("https");
    var url = require("url");
    var parsedUrl = url.parse(event.ResponseURL);
    var options = {
        hostname: parsedUrl.hostname,port: 443,path: parsedUrl.path,method: "PUT",headers: {
            "content-type": "","content-length": responseBody.length
        }
    };
    console.log("SENDING RESPONSE...\n");
    var request = https.request(options,function(response) {
        console.log("STATUS: " + response.statusCode);
        console.log("HEADERS: " + JSON.stringify(response.headers));
        // Tell AWS Lambda that the function execution is done  
        context.done();
    });
    request.on("error",function(error) {
        console.log("sendResponse Error:" + error);
        // Tell AWS Lambda that the function execution is done  
        context.done();
    });
    // write data to request body
    request.write(responseBody);
    request.end();
}

已经在这两天了:(

PS 在日志中,“RESPONSE BODY”按预期显示,就像我上面复制的一样,日志显示“SENDING RESPONSE”,但没有进入“STATUS:”和“HEADERS” : " request.https() 调用的一部分,这让我觉得异步会干扰这个调用...... IDK

解决方法

这个真的很棘手,但最终一切都想通了。我必须通过向其添加承诺,等待该承诺并返回它来使 sendResponse 函数异步。这让我最终可以调用“return await sendResponse(event,context,responseStatus,responseData);”最后一切正常,创建和删除操作都成功,CloudFormation 自定义资源按预期完成。呼。在此发布代码,希望其他人能从中受益。

var aws = require("aws-sdk");
exports.handler = async function(event,callback) {
    console.log("REQUEST RECEIVED:\n" + JSON.stringify(event));
    var connect = new aws.Connect({region: event.ResourceProperties.Region});
    var isInboundCallsEnabled = (process.env.InboundCallsEnabled == 'true');
    var isOutboundCallsEnabled = (process.env.OutboundCallsEnabled == 'true');
    var createInstanceParams = {
        InboundCallsEnabled: isInboundCallsEnabled,OutboundCallsEnabled: isOutboundCallsEnabled,IdentityManagementType: process.env.IdentityManagementType,ClientToken: process.env.ClientToken,DirectoryId: process.env.DirectoryId,InstanceAlias: process.env.InstanceAlias
    };
    var responseStatus;
    var responseData = {};
    if (event.RequestType == "Create") {
        try {
            var createInstanceRequest = await connect.createInstance(createInstanceParams).promise();
            responseStatus = "SUCCESS";
            responseData = {"InstanceId": createInstanceRequest.Id};
        } catch (err) {
            responseStatus = "FAILED";
            responseData = {Error: "CreateInstance failed"};
            console.log(responseData.Error + ":\n",err);
        }
        return await sendResponse(event,responseData);
    }

    if (event.RequestType == "Delete") {
        var instanceId;
        var listInstanceRequest = await connect.listInstances({}).promise();
        listInstanceRequest.InstanceSummaryList.forEach(instance => {
            if (instance.InstanceAlias == createInstanceParams.InstanceAlias) {
                instanceId = instance.Id;
            }
        });
        if (instanceId !== undefined) {
            try {
                var deleteInstanceRequest = await connect.deleteInstance({"InstanceId": instanceId}).promise();
                responseStatus = "SUCCESS";
                responseData = {"InstanceId": instanceId};
            } catch (err) {
                responseStatus = "FAILED";
                responseData = {Error: "DeleteInstance call failed"};
                console.log(responseData.Error + ":\n",err);
            }
        } else {
            responseStatus = "FAILED";
            responseData = {Error: "DeleteInstance failed; no match found"};
            console.log(responseData.Error);
        }
        return await sendResponse(event,responseData);
    }
};

async function sendResponse(event,responseData) {
    let responsePromise = new Promise((resolve,reject) => {
        var responseBody = JSON.stringify({
            Status: responseStatus,Reason: "CloudWatch Log Stream: " + context.logStreamName,PhysicalResourceId: context.logStreamName,StackId: event.StackId,RequestId: event.RequestId,LogicalResourceId: event.LogicalResourceId,Data: responseData
        });
        console.log("RESPONSE BODY:\n",responseBody);
        var https = require("https");
        var url = require("url");
        var parsedUrl = url.parse(event.ResponseURL);
        var options = {
            hostname: parsedUrl.hostname,port: 443,path: parsedUrl.path,method: "PUT",headers: {
                "content-type": "","content-length": responseBody.length
            }
        };
        console.log("SENDING RESPONSE...\n");
        var request = https.request(options,function(response) {
            console.log("STATUS: " + response.statusCode);
            console.log("HEADERS: " + JSON.stringify(response.headers));
            resolve(JSON.parse(responseBody));
            context.done();
        });
        request.on("error",function(error) {
            console.log("sendResponse Error:" + error);
            reject(error);
            context.done();
        });
        request.write(responseBody);
        request.end();
    });
    return await responsePromise;
}
,

此答案是针对在 CloudFormation 中的 "Code" propertyAWS::Lambda::Function resource 中使用“ZipFile”选项的人的 OP 答案的变体。 ZipFile 方法的优势在于,除了允许将 Lambda 代码内联到 CF 模板之外,它还自动捆绑了一个“cfn-response.js”函数,非常类似于 OP 答案中的“异步函数 sendResponse”。通过从 OP 对承诺响应的回答中获得的洞察力(谢谢,我被卡住了并感到困惑),这就是我如何将 cfn-response 函数合并为一个可等待的承诺,以在我的异步 AWS API 调用后向 CF 发出信号(省略)为简洁起见)已完成:

CreateSnapshotFunction:
    Type: AWS::Lambda::Function
    Properties:
        Runtime: nodejs12.x
        Handler: index.handler
        Timeout: 900 # 15 mins
        Code:
            ZipFile: !Sub |
                const resp = require('cfn-response');
                const aws = require('aws-sdk');
                const cf = new aws.CloudFormation({apiVersion: '2010-05-15'});
                const rds = new aws.RDS({apiVersion: '2014-10-31'});

                exports.handler = async function(evt,ctx) {
                    if (evt.RequestType == "Create") {
                        try {
                            // Query the given CF stack,determine its database
                            // identifier,create a snapshot of the database,// and await an "available" status for the snapshot
                            let stack = await getStack(stackNameSrc);
                            let srcSnap = await createSnapshot(stack);
                            let pollFn = () => describeSnapshot(srcSnap.DBSnapshot.DBSnapshotIdentifier);
                            let continueFn = snap => snap.DBSnapshots[0].Status !== 'available';
                            await poll(pollFn,continueFn,10,89); // timeout after 14 min,50 sec

                            // Send response to CF
                            await send(evt,ctx,resp.SUCCESS,{
                                SnapshotId: srcSnap.DBSnapshot.DBSnapshotIdentifier,UpgradeRequired: upgradeRequired
                            });
                        } catch(err) {
                            await send(evt,resp.FAILED,{ ErrorMessage: err } );
                        }
                    } else {
                        // Send success to CF for delete and update requests
                        await send(evt,{});
                    }
                };

                function send(evt,status,data) {
                    return new Promise(() => { resp.send(evt,data) });
                }