AWS Fifo 在 Lambda Throttle 上停止

问题描述

我们使用配置了 Fifo Queue 作为 Lambda Functionprocessor。我们使用 MessageGoupIdBatchSize 乐观地删除冗余消息。我们使用 reserved concurrency 来限制速率处理。我们的函数 timeout 必须很高。队列 maximum receives 设置为 10。

观察

当队列中有大量消息时,Lambda 函数会向上扩展。一旦扩展到足以限制,队列处理就会完全停止,直到几分钟后才会处理更多消息。

我假设这是因为节流,因为停止总是与节流同时发生,并且在增加保留并发时,处理停止所需的时间要长得多。

我假设队列再次启动的时间与 lambda retry limit函数 timeout 和队列 visibility timeout 有关。但由于我不知道到底发生了什么,所以这是一个猜测。

问题

没有错误记录,最终所有消息都得到处理,但由于处理对时间和吞吐量敏感,因此让队列暂停几分钟是不可接受的。

问题

发生了什么,我们如何解决这个问题?如果需要更多信息,我很乐意进一步调试。


编辑:发现这个:To allow your function time to process each batch of records,set the source queue's visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a prevIoUs batch.,我们肯定违反了,但我不确定这如何/是否解释了观察到的行为。

解决方法

如何重现

在此答案的末尾有一个完整的、最小的示例,可以轻松重现该问题。

要部署,请创建所有文件并将您的 aws profile 和所需的 region 填写到所有 sh 文件中。

然后运行

。部署-stack.sh

创建包含所有必要资源的 cloudformation 堆栈。

然后打开AWS Web界面(SQS)并运行

。生成-messages.sh

在队列上生成消息。

然后可以看到大约一半的消息在函数节流和队列完全停止之前被处理。

要在所有调试完成后移除 cloudformation 堆栈,请运行 remove-stack.sh

解决方案

AWS 文档 contains a note

要让您的函数有时间处理每批记录,请将源队列的可见性超时设置为您在函数上配置的超时的至少 6 倍。如果您的函数在处理前一个批次时函数执行受到限制,则额外的时间允许 Lambda 重试。

将 lambda 函数上的 timeout600 更改为 100 并重新部署堆栈允许所有消息正确处理,即使 lambda 函数节流。

我无法解释为什么会观察到这种行为,非常感谢对此提供反馈。但是,以上确实解决了所描述的问题。

文件

stack.yaml

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Debug Stack for Fifo with Lambda Processor
Resources:
  MyLambdaRole:
    Type: AWS::IAM::Role
    Properties:
      RoleName:
        Fn::Sub: lambda-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Action:
              - sts:AssumeRole
            Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AWSLambdaExecute
        - arn:aws:iam::aws:policy/AmazonSqsFullAccess
      Path: /
  MySqsQueue:
      Type: 'AWS::SQS::Queue'
      Properties:
        FifoQueue: true
        VisibilityTimeout: 600
  MySQSQueueFunction:
    Type: AWS::Lambda::Function
    Properties:
      Handler: index.handler
      Role: !GetAtt MyLambdaRole.Arn
      Runtime: nodejs12.x
      Timeout: 600
      ReservedConcurrentExecutions: 5
      Code:
        ZipFile: |
          exports.handler = (event,context) => new Promise((resolve) => {
            setTimeout(resolve,1000);
          });
  MySQSLambdaEventSource:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: false
      EventSourceArn: !GetAtt MySqsQueue.Arn
      FunctionName: !Ref MySQSQueueFunction
Outputs:
  QueueUrl:
    Value:
      Ref: MySqsQueue
  EventSource:
    Value:
      Ref: MySQSLambdaEventSource

deploy-stack.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

aws cloudformation deploy \
--profile $profile \
--region $region \
--template-file stack.yaml \
--stack-name fifo-lambda-debug \
--capabilities CAPABILITY_NAMED_IAM

generate-messages.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

function genGroupId {
  echo $(shuf -i 1-10 -n 1)
}
function genRndStr {
  echo $(openssl rand -hex 12)
}
function entry {
  echo "{\"Id\":\"$(genRndStr)\",\"MessageBody\":\"$(genRndStr)\",\"MessageGroupId\":\"$(genGroupId)\",\"MessageDeduplicationId\":\"$(genRndStr)\"}"
}

# -----------------

echo "Getting Subscription UUID..."
eventSource=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='EventSource'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)

echo "Getting Queue Url..."
queueUrl=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='QueueUrl'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)

echo "Disabling Subscription"
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--no-enabled \
> /dev/null

while : ; do
    echo "Waiting until Subscription disabled..."
    [[ $(aws lambda get-event-source-mapping \
      --profile $profile \
      --region $region \
      --uuid $eventSource \
      --query "State") != '"Disabled"' ]] || break
    sleep 10
done

echo "Queueing Messages..."
for i in {1..30}
do
  aws sqs send-message-batch \
  --profile $profile \
  --region $region \
  --queue-url "$queueUrl" \
  --entries "[$(entry),$(entry),$(entry)]" \
  > /dev/null
  echo "Done: $i / 30"
done

echo "Re-Enabling Subscription..."
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--enabled \
> /dev/null

remove-stack.sh

#!/bin/bash

profile=local
region=us-east-1

# -----------------

aws cloudformation delete-stack \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug