问题描述
我们使用配置了 Fifo Queue 作为 Lambda Function 的 processor。我们使用 MessageGoupId 和 BatchSize 乐观地删除冗余消息。我们使用 reserved concurrency 来限制速率处理。我们的函数 timeout 必须很高。队列 maximum receives 设置为 10。
观察
当队列中有大量消息时,Lambda 函数会向上扩展。一旦扩展到足以限制,队列处理就会完全停止,直到几分钟后才会处理更多消息。
我假设这是因为节流,因为停止总是与节流同时发生,并且在增加保留并发时,处理停止所需的时间要长得多。
我假设队列再次启动的时间与 lambda retry limit、函数 timeout 和队列 visibility timeout 有关。但由于我不知道到底发生了什么,所以这是一个猜测。
问题
没有错误记录,最终所有消息都得到处理,但由于处理对时间和吞吐量敏感,因此让队列暂停几分钟是不可接受的。
问题
发生了什么,我们如何解决这个问题?如果需要更多信息,我很乐意进一步调试。
编辑:发现这个:To allow your function time to process each batch of records,set the source queue's visibility timeout to at least 6 times the timeout that you configure on your function. The extra time allows for Lambda to retry if your function execution is throttled while your function is processing a prevIoUs batch.
,我们肯定违反了,但我不确定这如何/是否解释了观察到的行为。
解决方法
如何重现
在此答案的末尾有一个完整的、最小的示例,可以轻松重现该问题。
要部署,请创建所有文件并将您的 aws profile
和所需的 region
填写到所有 sh
文件中。
然后运行
。部署-stack.sh
创建包含所有必要资源的 cloudformation 堆栈。
然后打开AWS Web界面(SQS)并运行
。生成-messages.sh
在队列上生成消息。
然后可以看到大约一半的消息在函数节流和队列完全停止之前被处理。
要在所有调试完成后移除 cloudformation 堆栈,请运行 remove-stack.sh
解决方案
AWS 文档 contains a note 说
要让您的函数有时间处理每批记录,请将源队列的可见性超时设置为您在函数上配置的超时的至少 6 倍。如果您的函数在处理前一个批次时函数执行受到限制,则额外的时间允许 Lambda 重试。
将 lambda 函数上的 timeout
从 600
更改为 100
并重新部署堆栈允许所有消息正确处理,即使 lambda 函数节流。
我无法解释为什么会观察到这种行为,非常感谢对此提供反馈。但是,以上确实解决了所描述的问题。
文件
stack.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Debug Stack for Fifo with Lambda Processor
Resources:
MyLambdaRole:
Type: AWS::IAM::Role
Properties:
RoleName:
Fn::Sub: lambda-role
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Action:
- sts:AssumeRole
Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AWSLambdaExecute
- arn:aws:iam::aws:policy/AmazonSqsFullAccess
Path: /
MySqsQueue:
Type: 'AWS::SQS::Queue'
Properties:
FifoQueue: true
VisibilityTimeout: 600
MySQSQueueFunction:
Type: AWS::Lambda::Function
Properties:
Handler: index.handler
Role: !GetAtt MyLambdaRole.Arn
Runtime: nodejs12.x
Timeout: 600
ReservedConcurrentExecutions: 5
Code:
ZipFile: |
exports.handler = (event,context) => new Promise((resolve) => {
setTimeout(resolve,1000);
});
MySQSLambdaEventSource:
Type: AWS::Lambda::EventSourceMapping
Properties:
BatchSize: 1
Enabled: false
EventSourceArn: !GetAtt MySqsQueue.Arn
FunctionName: !Ref MySQSQueueFunction
Outputs:
QueueUrl:
Value:
Ref: MySqsQueue
EventSource:
Value:
Ref: MySQSLambdaEventSource
deploy-stack.sh
#!/bin/bash
profile=local
region=us-east-1
# -----------------
aws cloudformation deploy \
--profile $profile \
--region $region \
--template-file stack.yaml \
--stack-name fifo-lambda-debug \
--capabilities CAPABILITY_NAMED_IAM
generate-messages.sh
#!/bin/bash
profile=local
region=us-east-1
# -----------------
function genGroupId {
echo $(shuf -i 1-10 -n 1)
}
function genRndStr {
echo $(openssl rand -hex 12)
}
function entry {
echo "{\"Id\":\"$(genRndStr)\",\"MessageBody\":\"$(genRndStr)\",\"MessageGroupId\":\"$(genGroupId)\",\"MessageDeduplicationId\":\"$(genRndStr)\"}"
}
# -----------------
echo "Getting Subscription UUID..."
eventSource=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='EventSource'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)
echo "Getting Queue Url..."
queueUrl=$(aws cloudformation describe-stacks \
--query "Stacks[0].Outputs[?OutputKey=='QueueUrl'].OutputValue" \
--output text \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug)
echo "Disabling Subscription"
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--no-enabled \
> /dev/null
while : ; do
echo "Waiting until Subscription disabled..."
[[ $(aws lambda get-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--query "State") != '"Disabled"' ]] || break
sleep 10
done
echo "Queueing Messages..."
for i in {1..30}
do
aws sqs send-message-batch \
--profile $profile \
--region $region \
--queue-url "$queueUrl" \
--entries "[$(entry),$(entry),$(entry)]" \
> /dev/null
echo "Done: $i / 30"
done
echo "Re-Enabling Subscription..."
aws lambda update-event-source-mapping \
--profile $profile \
--region $region \
--uuid $eventSource \
--enabled \
> /dev/null
remove-stack.sh
#!/bin/bash
profile=local
region=us-east-1
# -----------------
aws cloudformation delete-stack \
--profile $profile \
--region $region \
--stack-name fifo-lambda-debug